19 config_dbi = dict({
"db_name":
"sqlite:///:memory:"})
30 taskLog.id = startValue
31 taskLog.pId = startValue + 1
32 taskLog.nodeName =str(startValue + 2)
33 taskLog.cDate = datetime.datetime.now()
34 taskLog.sDate = datetime.datetime.now()
35 taskLog.rDate = datetime.datetime.now()
36 taskLog.fDate = datetime.datetime.now()
37 taskLog.pTime = startValue + 7
38 taskLog.pTimeMax = startValue + 8
39 taskLog.state = startValue + 9
40 taskLog.uRRAM = startValue + 10
41 taskLog.uVRAM = startValue + 11
42 taskLog.uCPU = startValue + 12
43 taskLog.uThreads = startValue + 13
44 taskLog.tries = startValue + 14
49 print self.
dbi.getErrorCode()
50 if self.
dbi.getErrorCode() != dbi_consts.DBI_SUCCESS_CODE:
57 results = self.
dbi.fetchAll(taskLogScheme)
61 self.assertEqual(len(results[0]), 0,
"")
72 self.
dbi.insert(taskLogScheme)
75 fetchTaskLogScheme = self.
dbi.fetch(lookTaskLogScheme,
"id=%s" % taskLog.id)[0]
77 fetchTaskLog = fetchTaskLogScheme._getTaskLog()
79 self.assertEqual(taskLog.__dict__, fetchTaskLog.__dict__,
"")
81 self.
dbi.delete(taskLogScheme,
"id=%s" % taskLog.id)
88 self.
dbi.insert(taskLogScheme)
90 self.
dbi.delete(taskLogScheme,
"id=%s" % taskLog.id)
92 results = self.
dbi.fetchAll(taskLogScheme)
95 self.assertEqual(len(results[0]), 0,
"")
102 self.
dbi.insert(taskLogScheme)
107 self.
dbi.update(taskLogScheme,
"id=%s" % taskLog.id)
110 fetchTaskLogScheme = self.
dbi.fetch(taskLogScheme,
"id=%s" % taskLog.id)[0]
111 fetchTaskLog = fetchTaskLogScheme._getTaskLog()
113 self.assertEqual(fetchTaskLog.pId, taskLog.pId,
"")
115 self.
dbi.delete(taskLogScheme,
"id=%s" % taskLog.id)
123 self.
dbi.insert(taskLogScheme)
126 self.
dbi.insert(taskBackLogScheme)
129 fetchTaskLogScheme = self.
dbi.fetch(taskLogScheme,
"id=%s" % taskLog.id)[0]
131 fetchTaskLog = fetchTaskLogScheme._getTaskLog()
133 self.assertEqual(taskLog.__dict__, fetchTaskLog.__dict__,
"")
135 fetchTaskBackLogScheme = self.
dbi.fetch(taskBackLogScheme,
"id=%s" % taskLog.id)[0]
137 fetchTaskBackLog = fetchTaskBackLogScheme._getTaskLog()
139 self.assertEqual(taskLog.__dict__, fetchTaskBackLog.__dict__,
"")
142 self.
dbi.delete(taskLogScheme,
"id=%s" % taskLog.id)
143 self.
dbi.delete(taskBackLogScheme,
"id=%s" % taskLog.id)
def test_create_empty_db(self)
Class describes structures of task item used in TaskManager.
def test_insert_data(self)
def test_insert_in_two_bases(self)
def test_update_data(self)
def getNewTaskLog(self, startValue)