2 Created on Mar 13, 2014 20 tf = tempfile.NamedTemporaryFile()
21 print os.path.basename(tf.name)
22 sql_name =
"sqlite:///" + os.path.basename(tf.name) +
"TestSchedulerTaskScheme.db" 23 config_dbi = dict({
"db_name": sql_name})
32 schedulerTask.id = startValue
33 schedulerTask.rTime = startValue + 1
34 schedulerTask.rTimeMax = startValue + 2
35 schedulerTask.state = startValue + 3
36 schedulerTask.priority = startValue + 4
41 print self.
dbi.getErrorCode()
42 if self.
dbi.getErrorCode() != dbi_consts.DBI_SUCCESS_CODE:
51 self.assertEqual(len(results[0]), 0,
"")
56 lookSchedulerTask.id = 1
62 fetchSchedulerTaskScheme = self.
dbi.fetch(lookSchedulerTaskScheme,
"id=%s" % lookSchedulerTask.id)[0]
64 fetchSchedulerTask = fetchSchedulerTaskScheme._getSchedulerTask()
66 self.assertEqual(self.
schedulerTask.__dict__, fetchSchedulerTask.__dict__,
"")
81 self.assertEqual(len(results[0]), 0,
"")
94 fetchSchedulerTaskScheme = self.
dbi.fetch(updateSchedulerTaskScheme,
"id=%s" % self.
schedulerTask.id)[0]
97 fetchSchedulerTask = fetchSchedulerTaskScheme._getSchedulerTask()
99 self.assertEqual(fetchSchedulerTask.__dict__, self.
schedulerTask.__dict__,
"")
Class describes structures of task item used in Scheduler.
def test_insert_data(self)
def getNewSchedulerTask(self, startValue)
def test_create_empty_db(self)
def test_update_data(self)