HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
tests.test_dtm_SchedulerTaskScheme.TestSchedulerTaskScheme Class Reference
Inheritance diagram for tests.test_dtm_SchedulerTaskScheme.TestSchedulerTaskScheme:
Collaboration diagram for tests.test_dtm_SchedulerTaskScheme.TestSchedulerTaskScheme:

Public Member Functions

def setUp (self)
 
def getNewSchedulerTask (self, startValue)
 
def checkError (self)
 
def test_create_empty_db (self)
 
def test_insert_data (self)
 
def test_del_data (self)
 
def test_update_data (self)
 

Public Attributes

 dbi
 
 schedulerTask
 
 schedulerTaskScheme
 

Detailed Description

Definition at line 16 of file test_dtm_SchedulerTaskScheme.py.

Member Function Documentation

◆ checkError()

def tests.test_dtm_SchedulerTaskScheme.TestSchedulerTaskScheme.checkError (   self)

Definition at line 40 of file test_dtm_SchedulerTaskScheme.py.

40  def checkError(self):
41  print self.dbi.getErrorCode()
42  if self.dbi.getErrorCode() != dbi_consts.DBI_SUCCESS_CODE:
43  assert(False)
44 
45 
Here is the caller graph for this function:

◆ getNewSchedulerTask()

def tests.test_dtm_SchedulerTaskScheme.TestSchedulerTaskScheme.getNewSchedulerTask (   self,
  startValue 
)

Definition at line 30 of file test_dtm_SchedulerTaskScheme.py.

30  def getNewSchedulerTask(self, startValue):
31  schedulerTask = SchedulerTask()
32  schedulerTask.id = startValue
33  schedulerTask.rTime = startValue + 1
34  schedulerTask.rTimeMax = startValue + 2
35  schedulerTask.state = startValue + 3
36  schedulerTask.priority = startValue + 4
37  return schedulerTask
38 
39 

◆ setUp()

def tests.test_dtm_SchedulerTaskScheme.TestSchedulerTaskScheme.setUp (   self)

Definition at line 19 of file test_dtm_SchedulerTaskScheme.py.

19  def setUp(self):
20  tf = tempfile.NamedTemporaryFile()
21  print os.path.basename(tf.name)
22  sql_name = "sqlite:///" + os.path.basename(tf.name) + "TestSchedulerTaskScheme.db"
23  config_dbi = dict({"db_name": sql_name})
24  #config_dbi = dict({"db_name": "sqlite:///:memory:"})
25  self.dbi = DBI(config_dbi)
26  self.schedulerTask = self.getNewSchedulerTask(1)
27  self.schedulerTaskScheme = SchedulerTaskScheme(self.schedulerTask)
28 
29 

◆ test_create_empty_db()

def tests.test_dtm_SchedulerTaskScheme.TestSchedulerTaskScheme.test_create_empty_db (   self)

Definition at line 46 of file test_dtm_SchedulerTaskScheme.py.

46  def test_create_empty_db(self):
47  results = self.dbi.fetchAll(self.schedulerTaskScheme)
48  self.checkError()
49 
50 
51  self.assertEqual(len(results[0]), 0, "")
52 
53 
Here is the call graph for this function:

◆ test_del_data()

def tests.test_dtm_SchedulerTaskScheme.TestSchedulerTaskScheme.test_del_data (   self)

Definition at line 71 of file test_dtm_SchedulerTaskScheme.py.

71  def test_del_data(self):
72  self.dbi.insert(self.schedulerTaskScheme)
73  self.checkError()
74 
75  self.dbi.delete(self.schedulerTaskScheme, "id=%s" % self.schedulerTask.id)
76  self.checkError()
77 
78  results = self.dbi.fetchAll(self.schedulerTaskScheme)
79  self.checkError()
80 
81  self.assertEqual(len(results[0]), 0, "")
82 
83 
Here is the call graph for this function:

◆ test_insert_data()

def tests.test_dtm_SchedulerTaskScheme.TestSchedulerTaskScheme.test_insert_data (   self)

Definition at line 54 of file test_dtm_SchedulerTaskScheme.py.

54  def test_insert_data(self):
55  lookSchedulerTask = SchedulerTask()
56  lookSchedulerTask.id = 1
57  lookSchedulerTaskScheme = SchedulerTaskScheme(lookSchedulerTask)
58 
59  self.dbi.insert(self.schedulerTaskScheme)
60  self.checkError()
61 
62  fetchSchedulerTaskScheme = self.dbi.fetch(lookSchedulerTaskScheme, "id=%s" % lookSchedulerTask.id)[0]
63  self.checkError()
64  fetchSchedulerTask = fetchSchedulerTaskScheme._getSchedulerTask()
65 
66  self.assertEqual(self.schedulerTask.__dict__, fetchSchedulerTask.__dict__, "")
67 
68  self.dbi.delete(self.schedulerTaskScheme, "id=%s" % self.schedulerTask.id)
69 
70 
Here is the call graph for this function:

◆ test_update_data()

def tests.test_dtm_SchedulerTaskScheme.TestSchedulerTaskScheme.test_update_data (   self)

Definition at line 84 of file test_dtm_SchedulerTaskScheme.py.

84  def test_update_data(self):
85  self.dbi.insert(self.schedulerTaskScheme)
86  self.checkError()
87 
88  self.schedulerTask.state = 101
89  updateSchedulerTaskScheme = SchedulerTaskScheme(self.schedulerTask)
90 
91  self.dbi.update(updateSchedulerTaskScheme, "id=%s" % self.schedulerTask.id)
92  self.checkError()
93 
94  fetchSchedulerTaskScheme = self.dbi.fetch(updateSchedulerTaskScheme, "id=%s" % self.schedulerTask.id)[0]
95  self.checkError()
96 
97  fetchSchedulerTask = fetchSchedulerTaskScheme._getSchedulerTask()
98 
99  self.assertEqual(fetchSchedulerTask.__dict__, self.schedulerTask.__dict__, "")
100 
101  self.dbi.delete(self.schedulerTaskScheme, "id=%s" % self.schedulerTask.id)
Here is the call graph for this function:

Member Data Documentation

◆ dbi

tests.test_dtm_SchedulerTaskScheme.TestSchedulerTaskScheme.dbi

Definition at line 25 of file test_dtm_SchedulerTaskScheme.py.

◆ schedulerTask

tests.test_dtm_SchedulerTaskScheme.TestSchedulerTaskScheme.schedulerTask

Definition at line 26 of file test_dtm_SchedulerTaskScheme.py.

◆ schedulerTaskScheme

tests.test_dtm_SchedulerTaskScheme.TestSchedulerTaskScheme.schedulerTaskScheme

Definition at line 27 of file test_dtm_SchedulerTaskScheme.py.


The documentation for this class was generated from the following file: