-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdatabase.py
More file actions
234 lines (205 loc) · 9.59 KB
/
database.py
File metadata and controls
234 lines (205 loc) · 9.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import abc
import getpass
import util
import pymysql
import os
import subprocess
import re
import csv
import zipfile
import copy
import traceback
class TableColumn:
def __init__(self, name, option):
self.name = name
self.option = option
class MySqlBase:
def __init__(self, user, password, host, port):
self.user = user
self.password = password
self.host = host
self.port = int(port)
self.database = None
self.conn = None
self.cur = None
def default_table_column(self):
return None
def create_table(self, table_name, columns=None, check_overwrite=True):
query_string = "CREATE TABLE " + table_name + "("
if columns is None:
print("Warning: The cloumns query value to create doesn't exist. Using default function")
columns = self.default_table_column()
if columns is None:
print("Columns is not defined, Please override default_table_column() and return list of TableColumns")
return False
else:
columns = list(columns)
columns.reverse()
for ci in columns:
if not isinstance(ci, TableColumn):
continue
query_string += ci.name + " " + ci.option + ","
query_string = query_string[:-1] + ")"
else:
query_string += columns + ")"
if check_overwrite:
if util.has_table(self, table_name):
print("Warning: 사용하려는 테이블이 이미 존재하고 있습니다: {name}".format(name=table_name))
print("이전에 자동화하여 값을 넣었던 데이터베이스라면 새로운 구축을 위해 삭제하는 것을 권장하고 있습니다.")
delete = input("삭제하시겠습니까? (미입력시 취소, y/yes) ")
if delete == "y" or delete == "yes":
if self.send_query("DROP TABLE {db}".format(db=table_name)):
print("정상적으로 삭제되었습니다.")
else:
print("테이블 삭제 실패")
return self.send_query(query_string)
def send_query(self, query, o=None, output=True, debug=None):
try:
self.checkdb()
if o is None:
self.cur.execute(query)
else:
self.cur.execute(query, o)
return True
except Exception as e:
if output:
print("Failed executing query: '%s', Reason: " % query, end='', flush=True)
print(e)
if debug is not None:
debug[e.__str__()] = o
else:
return False
def save(self):
self.checkdb()
self.conn.commit()
def disconnect(self):
try:
self.save()
self.cur.close()
self.conn.close()
return True
except Exception as e:
print("Failed closed, Reason: ", end='', flush=True)
print(e)
return False
def checkdb(self, chkdb=False):
if self.conn is None:
raise pymysql.DatabaseError('Not connected the database. Please connect() at first')
if chkdb:
if self.database is None:
raise pymysql.DatabaseError(
'What is use to database? Please use function set_database() or Execute the query string manually.')
def connect(self, characterSet='utf8'):
if self.password is None:
self.password = getpass.getpass('Please input your password {u}@{h} > '.format(u=self.user, h=self.host))
try:
self.conn = pymysql.connect(host=self.host, user=self.user, passwd=self.password, port=self.port, charset=characterSet)
if self.conn is not None:
self.cur = self.conn.cursor(pymysql.cursors.DictCursor)
except pymysql.Error:
traceback.print_exc()
def create_database(self, database_name, check_overwrite=True):
query = "CREATE DATABASE "
query += database_name
if check_overwrite:
if util.has_database(self, database_name):
print("Warning: 사용하려는 데이터베이스가 이미 존재하고 있습니다: {name}".format(name=database_name))
print("이전에 자동화하여 값을 넣었던 데이터베이스라면 새로운 구축을 위해 삭제하는 것을 권장하고 있습니다.")
delete = input("삭제하시겠습니까? (미입력시 취소, y/yes) ")
if delete == "y" or delete == "yes":
if self.send_query("DROP DATABASE {db}".format(db=database_name)):
print("정상적으로 삭제되었습니다.")
else:
print("데이터베이스 삭제 실패")
return self.send_query(query)
def use_database(self, database_name, create_safety=True):
if self.send_query("USE %s" % database_name):
self.database = database_name
return True
else:
if create_safety:
print("Database not found: %s" % database_name)
print("Creating a new database: %s ..." % database_name)
if self.create_database(database_name, False):
return self.use_database(database_name, False)
else:
return False
else:
traceback.print_exc()
return False
def export(self, export_path_name="unspecified", export_type='sql', tablename="Unknown_table", option=None, charset='utf-8'):
self.checkdb()
type_vaild = {"sql": True,"csv": True}.get(export_type, False)
if type_vaild:
# The output filename, It includes extension value.
filename = export_path_name + "." + export_type
# If the filetye is CSV (Excel)
if export_type == "csv":
print("Export to excel file: %s" % filename)
if option is None:
option = ""
# Execute query to saved column elements.
if self.send_query('SELECT * FROM {db}.{table_name} {op}'.format(db=self.database, table_name=tablename, op=option)):
# Create a new file and open
fs = open(filename, 'w', encoding=charset)
# open the csv writer with fs pointer
f = csv.writer(fs, lineterminator='\n')
schema = []
# Read the output line by cursor executed
fetch = self.cur.fetchall()
# Create a table columns
for e in fetch[0].keys():
schema.append(e)
f.writerow(schema)
# Insert values with the role of columns
for es in fetch:
schema = []
for k in es.keys():
if "__len__" in k:
continue
schema.append(es[k])
f.writerow(schema)
# Save the file.
fs.flush()
fs.close()
print("Sucessfully make the export file: %s" % filename)
return filename
else:
raise Exception()
print("Failed make the export file: %s" % filename)
else:
mysqldump_version = -1
mysqldump_version_desc = 'Unknown'
try:
mysqldump_version_desc = subprocess.Popen(["mysqldump"," --version"], stdout=subprocess.PIPE).communicate()[0]
except FileNotFoundError:
print("To export to SQL can't execute, Because there's no export progream")
raise Exception("Failed make the export file: %s" % filename)
pattern = re.compile(r'Ver (\d{,2}).(\d{0,2}).(\d{0,3})')
p = re.search(pattern, mysqldump_version_desc)
if p.group() is not None:
try:
mysqldump_version = p.group(1)
except IndexError:
pass
print("Export to sql file: %s" % filename)
print("mysqldump version: %s" % mysqldump_version_desc)
password_exist = None
if self.password != "" and self.password is not None:
password_exist = "-p{pwd}".format(pwd=self.password)
else:
password_exist = ""
column_statistics = ''
if mysqldump_version >= 8:
column_statistics = '--column-statistics=0'
ps = subprocess.Popen("mysqldump {cs} -u {user} {pwd} -h {host} {database_name} > {output}".format(cs=column_statistics, pwd=password_exist, user=self.user,host=self.host, database_name=self.database,output=filename), shell=True)
ps.communicate()
if ps.returncode == 0:
print("Sucessfully make the export file: %s" % filename)
return filename
else:
raise Exception("Failed make the export file: %s" % filename)
else:
print("The filetype read failed. Available types -> sql, csv")