-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathdb.py
339 lines (278 loc) · 19.8 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
#####
#
# Module name: db.py
# Purpose: Sqlite3 database class defnition & functions
#
# Notes:
#
#####
# Import system modules
import sqlite3
import sys
import os
from datetime import datetime
from datetime import timedelta
# Import dupReport modules
import globs
import drdatetime
class Database:
dbConn = None
def __init__(self, dbPath):
globs.log.write(globs.SEV_NOTICE,function='Database', action='Init', msg='Initializing database manager.')
# First, see if the database is there. If not, need to create it
isThere = os.path.isfile(dbPath)
if self.dbConn: # If not None then DB connection already exists. This is bad.
globs.log.write(globs.SEV_ERROR, function='Database', action='Init', msg='SQLite3 Error: trying to reinitialize the database connection. Exiting program.')
globs.closeEverythingAndExit(1) # Abort program. Can't continue with DB error
try:
self.dbConn = sqlite3.connect(dbPath) # Connect to database
except sqlite3.Error as err:
globs.log.write(globs.SEV_ERROR, function='Database', action='Init', msg='SQLite3 error connecting to database: {}. Exiting program.'.format(err.args[0]))
globs.closeEverythingAndExit(1) # Abort program. Can't continue with DB error
if not isThere: # Database did not exist. Need to initialize contents
globs.log.write(globs.SEV_NOTICE, function='Database', action='Init', msg='New database. Needs initializing.')
self.dbInitialize()
return None
# Close database connection
def dbClose(self):
globs.log.write(globs.SEV_NOTICE, function='Database', action='dbClose', msg='Closing database manager.')
# Don't attempt to close a non-existant conmnection
if self.dbConn:
self.dbConn.close()
self.dbConn = None
return None
# Return True if need to upgrade DB, false if DB is current.
def checkDbVersion(self):
needToUpgrade = False
dbCursor = self.execSqlStmt('SELECT major, minor, subminor FROM version WHERE desc = \'database\'')
maj, min, subm = dbCursor.fetchone()
currVerNum = (maj * 100) + (min * 10) + subm
newVerNum = (globs.dbVersion[0] * 100) + (globs.dbVersion[1] * 10) + globs.dbVersion[2]
globs.log.write(globs.SEV_DEBUG, function='Database', action='checkDbVersion', msg='Database: current version={} new version={}'.format(currVerNum, newVerNum))
if currVerNum < newVerNum:
globs.log.write(globs.SEV_NOTICE, function='Database', action='checkDbVersion', msg='Database version is out of date. Needs update to latest version.')
needToUpgrade = True
return needToUpgrade, currVerNum
# Commit pending database transaction
def dbCommit(self):
globs.log.write(globs.SEV_DEBUG, function='Database', action='dbCommit', msg='Committing transaction.')
if self.dbConn: # Don't try to commit to a nonexistant connection
self.dbConn.commit()
return None
def execEmailInsertSql(self, emailParts):
globs.log.write(globs.SEV_NOTICE, function='Database', action='execEmailInsertSql', msg='Inserting into emails table: messageId={} sourceComp={} destComp={}'.format(emailParts['header']['messageId'], emailParts['header']['sourceComp'], emailParts['header']['destComp']))
durVal = float(emailParts['body']['endTimestamp']) - float(emailParts['body']['beginTimestamp'])
sqlStmt = "INSERT INTO emails(messageId, sourceComp, destComp, emailTimestamp, \
deletedFiles, deletedFolders, modifiedFiles, examinedFiles, \
openedFiles, addedFiles, sizeOfModifiedFiles, sizeOfAddedFiles, sizeOfExaminedFiles, \
sizeOfOpenedFiles, notProcessedFiles, addedFolders, tooLargeFiles, filesWithError, \
modifiedFolders, modifiedSymlinks, addedSymlinks, deletedSymlinks, partialBackup, \
dryRun, mainOperation, parsedResult, verboseOutput, verboseErrors, endTimestamp, \
beginTimestamp, duration, messages, warnings, errors, dbSeen, dupversion, logdata, bytesUploaded, bytesDownloaded) \
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?, ?)"
data = (emailParts['header']['messageId'], emailParts['header']['sourceComp'], emailParts['header']['destComp'], emailParts['header']['emailTimestamp'], emailParts['body']['deletedFiles'], \
emailParts['body']['deletedFolders'], emailParts['body']['modifiedFiles'], emailParts['body']['examinedFiles'], emailParts['body']['openedFiles'], \
emailParts['body']['addedFiles'], emailParts['body']['sizeOfModifiedFiles'], emailParts['body']['sizeOfAddedFiles'], emailParts['body']['sizeOfExaminedFiles'], emailParts['body']['sizeOfOpenedFiles'], \
emailParts['body']['notProcessedFiles'], emailParts['body']['addedFolders'], emailParts['body']['tooLargeFiles'], emailParts['body']['filesWithError'], \
emailParts['body']['modifiedFolders'], emailParts['body']['modifiedSymlinks'], emailParts['body']['addedSymlinks'], emailParts['body']['deletedSymlinks'], \
emailParts['body']['partialBackup'], emailParts['body']['dryRun'], emailParts['body']['mainOperation'], emailParts['body']['parsedResult'], emailParts['body']['verboseOutput'], \
emailParts['body']['verboseErrors'], emailParts['body']['endTimestamp'], emailParts['body']['beginTimestamp'], \
durVal, emailParts['body']['messages'], emailParts['body']['warnings'], emailParts['body']['errors'], emailParts['body']['dupversion'], emailParts['body']['logdata'], emailParts['body']['bytesUploaded'], emailParts['body']['bytesDownloaded'])
globs.log.write(globs.SEV_DEBUG, function='Database', action='execEmailInsertSql', msg='sqlStmt=[{}]'.format(sqlStmt))
globs.log.write(globs.SEV_DEBUG, function='Database', action='execEmailInsertSql', msg='data=[{}]'.format(data))
if not self.dbConn:
return None
# Set db cursor
curs = self.dbConn.cursor()
try:
curs.execute(sqlStmt, data)
except sqlite3.Error as err:
globs.log.write(globs.SEV_ERROR, function='Database', action='execEmailInsertSql', msg='SQLite error: {}'.format(err.args[0]))
globs.closeEverythingAndExit(1) # Abort program. Can't continue with DB error
self.dbCommit()
return None
def execReportInsertSql(self, sqlStmt, sqlData):
globs.log.write(globs.SEV_NOTICE, function='Database', action='execReportInsertSql', msg='Inserting into emails table: sqlStmt=[{}] sqlData=[{}]'.format(sqlStmt, sqlData))
if not self.dbConn:
return None
# Set db cursor
curs = self.dbConn.cursor()
try:
curs.execute(sqlStmt, sqlData)
except sqlite3.Error as err:
globs.log.write(globs.SEV_ERROR, function='Database', action='execReportInsertSql', msg='SQLite error: {}'.format(err.args[0]))
globs.closeEverythingAndExit(1) # Abort program. Can't continue with DB error
self.dbCommit()
return None
# Execute a Sqlite command and manage exceptions
# Return the cursor object to the command result
def execSqlStmt(self, stmt):
globs.log.write(globs.SEV_NOTICE, function='Database', action='execSqlStmt', msg='Executing SQL statement: [{}]'.format(stmt))
if not self.dbConn:
return None
# Set db cursor
curs = self.dbConn.cursor()
try:
curs.execute(stmt)
except sqlite3.Error as err:
globs.log.write(globs.SEV_ERROR, function='Database', action='execSqlStmt', msg='SQLite error: {}'.format(err.args[0]))
globs.closeEverythingAndExit(1) # Abort program. Can't continue with DB error
# Return the cursor to the executed command result.
return curs
# Initialize database to empty, default tables
def dbInitialize(self):
globs.log.write(globs.SEV_NOTICE, function='Database', action='dbInitialize', msg='Initializing (resetting) database.')
# Don't initialize a non-existant connection
if not self.dbConn:
return None
# Drop any tables and indices that might already exist in the database
self.execSqlStmt("drop table if exists version")
self.execSqlStmt("drop table if exists emails")
self.execSqlStmt("drop table if exists backupsets")
self.execSqlStmt("drop table if exists report")
self.execSqlStmt("drop index if exists emailindx")
self.execSqlStmt("drop index if exists srcdestindx")
# version table holds current database version.
# Used to check for need to change database formats
self.execSqlStmt("create table version (desc varchar(20), major int, minor int, subminor int)")
self.execSqlStmt("insert into version(desc, major, minor, subminor) values (\'database\',{},{},{})".format(globs.dbVersion[0], globs.dbVersion[1], globs.dbVersion[2]))
# emails table holds information about all emails received
sqlStmt = "create table emails (messageId varchar(50), sourceComp varchar(50), destComp varchar(50), \
emailTimestamp real, deletedFiles int, deletedFolders int, modifiedFiles int, \
examinedFiles int, openedFiles int, addedFiles int, sizeOfModifiedFiles int, sizeOfAddedFiles int, sizeOfExaminedFiles int, \
sizeOfOpenedFiles int, notProcessedFiles int, addedFolders int, tooLargeFiles int, filesWithError int, \
modifiedFolders int, modifiedSymlinks int, addedSymlinks int, deletedSymlinks int, partialBackup varchar(30), \
dryRun varchar(30), mainOperation varchar(30), parsedResult varchar(30), verboseOutput varchar(30), \
verboseErrors varchar(30), endTimestamp real, \
beginTimestamp real, duration real, messages varchar(255), warnings varchar(255), errors varchar(255), failedMsg varchar(100), dbSeen int, dupversion varchar(100), logdata varchar(255), \
bytesUploaded int, bytesDownloaded int)"
self.execSqlStmt(sqlStmt)
self.execSqlStmt("create index emailindx on emails (messageId)")
self.execSqlStmt("create index srcdestindx on emails (sourceComp, destComp)")
sqlStmt = "create table report (source varchar(20), destination varchar(20), timestamp real, date real, time real, duration real, examinedFiles int, examinedFilesDelta int, \
sizeOfExaminedFiles int, fileSizeDelta int, addedFiles int, deletedFiles int, modifiedFiles int, filesWithError int, parsedResult varchar(30), messages varchar(255), \
warnings varchar(255), errors varchar(255), failedMsg varchar(100), dupversion varchar(100), logdata varchar(255), bytesUploaded int, bytesDownloaded int)"
self.execSqlStmt(sqlStmt)
# backup sets contains information on all source-destination pairs in the backups
self.execSqlStmt("create table backupsets (source varchar(20), destination varchar(20), lastFileCount integer, lastFileSize integer, \
lastTimestamp real, dupversion varchar(100))")
self.dbCommit()
self.dbCompact()
globs.log.write(globs.SEV_NOTICE, function='Database', action='dbInitialize', msg='Database initialization complete.')
return None
# See if a particular message ID is already in the database
# Return True (already there) or False (not there)
def searchForMessage(self, msgID):
globs.log.write(globs.SEV_NOTICE, function='Database', action='searchForMessage', msg='Searching for message {} in database.'.format(msgID))
sqlStmt = "SELECT messageId FROM emails WHERE messageId=\'{}\'".format(msgID)
dbCursor = self.execSqlStmt(sqlStmt)
idExists = dbCursor.fetchone()
if idExists:
globs.log.write(globs.SEV_NOTICE, function='Database', action='searchForMessage', msg='Message [{}] already in email database'.format(msgID))
return True
else:
globs.log.write(globs.SEV_NOTICE, function='Database', action='searchForMessage', msg='Message [{}] not yet in email database'.format(msgID))
return False
def searchSrcDestPair(self, src, dest, add2Db = True):
globs.log.write(globs.SEV_NOTICE, function='Database', action='searchSrcDestPair', msg='Searching for {}{}{} in backupsets'.format(src, globs.opts['srcdestdelimiter'], dest))
sqlStmt = "SELECT source, destination FROM backupsets WHERE source=\'{}\' AND destination=\'{}\'".format(src, dest)
dbCursor = self.execSqlStmt(sqlStmt)
idExists = dbCursor.fetchone()
if idExists:
globs.log.write(globs.SEV_NOTICE, function='Database', action='searchSrcDestPair', msg='{}{}{} already in backupsets.'.format(src, globs.opts['srcdestdelimiter'], dest))
return True
if add2Db is True:
sqlStmt = "INSERT INTO backupsets (source, destination, lastFileCount, lastFileSize, lastTimestamp, dupversion) \
VALUES ('{}', '{}', 0, 0, 0, '')".format(src, dest)
self.execSqlStmt(sqlStmt)
self.dbCommit()
globs.log.write(globs.SEV_NOTICE, function='Database', action='searchSrcDestPair', msg='{}{}{} added to database'.format(src, globs.opts['srcdestdelimiter'], dest))
return False
# Roll back database to specific date/time
# Datespec = Date & time to roll back to
def rollback(self, datespec):
globs.log.write(globs.SEV_NOTICE, function='Database', action='rollback', msg='Rolling back database: spec={}'.format(datespec))
# See if we're using a delta-based time spec (Issue #131)
deltaParts = drdatetime.timeDeltaSpec(datespec)
if deltaParts != False:
today = datetime.now()
globs.log.write(globs.SEV_DEBUG, function='Database', action='rollback', msg='Using delta timespec. Today={}'.format(today))
for i in range(len(deltaParts)):
tval = int(deltaParts[i][:-1])
tspec = deltaParts[i][-1:]
if tspec == 's': # Subtract seconds
today -= timedelta(seconds=tval)
elif tspec == 'm':
today -= timedelta(minutes=tval)
elif tspec == 'h':
today -= timedelta(hours=tval)
elif tspec == 'd':
today -= timedelta(days=tval)
elif tspec == 'w':
today -= timedelta(weeks=tval)
globs.log.write(globs.SEV_DEBUG, function='Database', action='rollback', msg='Rolled back {}{}. Today now={}'.format(tval,tspec, today))
newTimeStamp = today.timestamp()
else:
# Get timestamp for input date/time
newTimeStamp = drdatetime.toTimestamp(datespec)
# Delete all email records that happened after input datetime
sqlStmt = 'DELETE FROM emails WHERE emailtimestamp > {}'.format(newTimeStamp)
dbCursor = self.execSqlStmt(sqlStmt)
# Delete all backup set records that happened after input datetime
sqlStmt = 'SELECT source, destination FROM backupsets WHERE lastTimestamp > {}'.format(newTimeStamp)
dbCursor = self.execSqlStmt(sqlStmt)
setRows= dbCursor.fetchall()
for source, destination in setRows:
# Select largest timestamp from remaining data for that source/destination
sqlStmt = 'select max(endTimeStamp), examinedFiles, sizeOfExaminedFiles, dupversion from emails where sourceComp = \'{}\' and destComp= \'{}\''.format(source, destination)
dbCursor = self.execSqlStmt(sqlStmt)
emailTimestamp, examinedFiles, sizeOfExaminedFiles, dupversion = dbCursor.fetchone()
if emailTimestamp is None:
# After the rollback, some srcdest pairs may have no corresponding entries in the the database, meaning they were not seen until after the rollback period
# We should remove these from the database, to return it to the state it was in before the rollback.
globs.log.write(globs.SEV_NOTICE, function='Database', action='rollback', msg='Deleting {}{}{} from backupsets. Not seen until after rollback.'.format(source, globs.opts['srcdestdelimiter'], destination))
sqlStmt = 'DELETE FROM backupsets WHERE source = \"{}\" AND destination = \"{}\"'.format(source, destination)
dbCursor = self.execSqlStmt(sqlStmt)
else:
globs.log.write(globs.SEV_NOTICE, function='Database', action='rollback', msg='Resetting {}{}{} to {}'.format(source, globs.opts['srcdestdelimiter'], destination, drdatetime.fromTimestamp(emailTimestamp)))
# Update backupset table to reflect rolled-back date
sqlStmt = 'update backupsets set lastFileCount={}, lastFileSize={}, lastTimestamp={}, dupversion=\'{}\' where source = \'{}\' and destination = \'{}\''.format(examinedFiles, sizeOfExaminedFiles, emailTimestamp, dupversion, source, destination)
dbCursor = self.execSqlStmt(sqlStmt)
self.dbCommit()
return None
# Remove a source/destination pair from the database
def removeSrcDest(self, source, destination):
globs.log.write(globs.SEV_NOTICE, function='Database', action='removeSrcDest', msg='Deleting {}{}{} from database.'.format(source, globs.opts['srcdestdelimiter'], destination))
# Does the src/dest exist in the database?
exists = self.searchSrcDestPair(source, destination, False)
if not exists:
globs.log.write(globs.SEV_NOTICE, function='Database', action='removeSrcDest', msg='Pair {}{}{} does not exist in database. Check spelling and capitalization then try again.'.format(source, globs.opts['srcdestdelimiter'], destination))
return False
sqlStmt = "DELETE FROM backupsets WHERE source = \"{}\" AND destination = \"{}\"".format(source, destination)
dbCursor = self.execSqlStmt(sqlStmt)
sqlStmt = "DELETE FROM emails WHERE sourceComp = \"{}\" AND destComp = \"{}\"".format(source, destination)
dbCursor = self.execSqlStmt(sqlStmt)
self.dbCommit()
globs.log.write(globs.SEV_NOTICE, function='Database', action='removeSrcDest', msg='{}{}{} removed from database.'.format(source, globs.opts['srcdestdelimiter'], destination))
globs.log.write(globs.SEV_NOTICE, function='Database', action='removeSrcDest', msg='Please remove all emails referencing the \'{}{}{}\' backup,\nor they will be added back into the database the next time dupReport is run.'.format(source, globs.opts['srcdestdelimiter'], destination))
globs.log.out('Please remove all emails referencing the \'{}{}{}\' backup,\nor they will be added back into the database the next time dupReport is run.'.format(source, globs.opts['srcdestdelimiter'], destination))
return True
# Purge database of old emails
def purgeOldEmails(self):
globs.log.write(globs.SEV_NOTICE, function='Database', action='purgeOldEmails', msg='Purging unseen emails from database')
self.execSqlStmt('DELETE FROM emails WHERE dbSeen = 0')
self.dbCommit()
self.dbCompact()
return None
# Compact database to eliminate unused space
def dbCompact(self):
globs.log.write(globs.SEV_NOTICE, function='Database', action='dbCompact', msg='Compacting database')
# Need to reset connection isolation level in order to compress database
# Why? Not sure. But see https://github.com/ghaering/pysqlite/issues/109 for details
isoTmp = self.dbConn.isolation_level
self.dbConn.isolation_level = None
self.execSqlStmt('VACUUM')
self.dbConn.isolation_level = isoTmp # Re-set isolation level back to previous value
self.dbCommit()
globs.log.write(globs.SEV_NOTICE, function='Database', action='dbCompact', msg='Database compaction complete.')
return None