HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
test_dtm_SchedulerTaskScheme.py
Go to the documentation of this file.
1 '''
2 Created on Mar 13, 2014
3 
4 @author: igor
5 '''
6 
7 import unittest
8 from dtm.SchedulerTask import SchedulerTask
9 from dtm.SchedulerTaskScheme import SchedulerTaskScheme
10 from dbi.dbi import DBI
11 import dbi.Constants as dbi_consts
12 import tempfile
13 import os
14 
15 
16 class TestSchedulerTaskScheme(unittest.TestCase):
17 
18 
19  def setUp(self):
20  tf = tempfile.NamedTemporaryFile()
21  print os.path.basename(tf.name)
22  sql_name = "sqlite:///" + os.path.basename(tf.name) + "TestSchedulerTaskScheme.db"
23  config_dbi = dict({"db_name": sql_name})
24  #config_dbi = dict({"db_name": "sqlite:///:memory:"})
25  self.dbi = DBI(config_dbi)
28 
29 
30  def getNewSchedulerTask(self, startValue):
31  schedulerTask = SchedulerTask()
32  schedulerTask.id = startValue
33  schedulerTask.rTime = startValue + 1
34  schedulerTask.rTimeMax = startValue + 2
35  schedulerTask.state = startValue + 3
36  schedulerTask.priority = startValue + 4
37  return schedulerTask
38 
39 
40  def checkError(self):
41  print self.dbi.getErrorCode()
42  if self.dbi.getErrorCode() != dbi_consts.DBI_SUCCESS_CODE:
43  assert(False)
44 
45 
47  results = self.dbi.fetchAll(self.schedulerTaskScheme)
48  self.checkError()
49 
50 
51  self.assertEqual(len(results[0]), 0, "")
52 
53 
54  def test_insert_data(self):
55  lookSchedulerTask = SchedulerTask()
56  lookSchedulerTask.id = 1
57  lookSchedulerTaskScheme = SchedulerTaskScheme(lookSchedulerTask)
58 
59  self.dbi.insert(self.schedulerTaskScheme)
60  self.checkError()
61 
62  fetchSchedulerTaskScheme = self.dbi.fetch(lookSchedulerTaskScheme, "id=%s" % lookSchedulerTask.id)[0]
63  self.checkError()
64  fetchSchedulerTask = fetchSchedulerTaskScheme._getSchedulerTask()
65 
66  self.assertEqual(self.schedulerTask.__dict__, fetchSchedulerTask.__dict__, "")
67 
68  self.dbi.delete(self.schedulerTaskScheme, "id=%s" % self.schedulerTask.id)
69 
70 
71  def test_del_data(self):
72  self.dbi.insert(self.schedulerTaskScheme)
73  self.checkError()
74 
75  self.dbi.delete(self.schedulerTaskScheme, "id=%s" % self.schedulerTask.id)
76  self.checkError()
77 
78  results = self.dbi.fetchAll(self.schedulerTaskScheme)
79  self.checkError()
80 
81  self.assertEqual(len(results[0]), 0, "")
82 
83 
84  def test_update_data(self):
85  self.dbi.insert(self.schedulerTaskScheme)
86  self.checkError()
87 
88  self.schedulerTask.state = 101
89  updateSchedulerTaskScheme = SchedulerTaskScheme(self.schedulerTask)
90 
91  self.dbi.update(updateSchedulerTaskScheme, "id=%s" % self.schedulerTask.id)
92  self.checkError()
93 
94  fetchSchedulerTaskScheme = self.dbi.fetch(updateSchedulerTaskScheme, "id=%s" % self.schedulerTask.id)[0]
95  self.checkError()
96 
97  fetchSchedulerTask = fetchSchedulerTaskScheme._getSchedulerTask()
98 
99  self.assertEqual(fetchSchedulerTask.__dict__, self.schedulerTask.__dict__, "")
100 
101  self.dbi.delete(self.schedulerTaskScheme, "id=%s" % self.schedulerTask.id)
Class describes structures of task item used in Scheduler.
Definition: dbi.py:1