HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
test_dtm_Scheduler.py
Go to the documentation of this file.
1 '''
2 Created on Mar 14, 2014
3 
4 @author: igor
5 '''
6 import unittest
7 from mock import MagicMock, ANY
8 import ConfigParser
9 import tempfile
10 import os
11 from app.BaseServerManager import BaseServerManager
12 from app.PollerManager import PollerManager
13 from dtm.Scheduler import PLANED, SELECTED_EE
14 from dtm.Scheduler import Scheduler
15 from dtm.SchedulerTask import SchedulerTask
16 from dtm.SchedulerTaskScheme import SchedulerTaskScheme
17 from transport.ConnectionBuilderLight import ConnectionBuilderLight
18 from transport.ConnectionLight import ConnectionLight
19 from transport.Event import EventBuilder
20 from dtm.EventObjects import NewTask, UpdateTask, DeleteTask, ResourcesAVG, GetScheduledTasks
21 from dtm.EventObjects import GetScheduledTasksResponse
22 from dtm.Constants import EVENT_TYPES
23 from dbi.dbi import DBI
24 from test_dtm_TasksManager import Matcher
25 import dbi.Constants as dbi_const
26 import transport.Consts as tr_consts
27 
28 import logging
29 FORMAT = '%(asctime)s - %(thread)ld - %(threadName)s - %(name)s - %(funcName)s - %(levelname)s - %(message)s'
30 logging.basicConfig(level=logging.DEBUG, format=FORMAT) #basic logging configuration
31 
32 
33 def matchGetAVGResourceEvent(first, second):
34  return first.uid == second.uid and first.eventType == second.eventType
35 
36 
37 def matchEventUID(first, second):
38  return first.uid == second.uid
39 
40 
41 def matchGetScheduledTasks(first, second):
42  print "first ", first.eventObj.__dict__, "second ", second.eventObj.__dict__
43  return first.eventObj.__dict__ == second.eventObj.__dict__
44 
45 
46 class TestScheduler(unittest.TestCase):
47 
48 
49  def setUp(self):
50  self.config = ConfigParser.RawConfigParser()
51  cfg_section = "Scheduler"
52 
53  tf = tempfile.NamedTemporaryFile()
54  sql_name = "sqlite:///" + os.path.basename(tf.name) + "Scheduler.db"
55 
56  self.config.add_section(cfg_section)
57  self.config.set(cfg_section, Scheduler.SERVER, "Scheduler")
58  self.config.set(cfg_section, Scheduler.RESOURCES_MANAGER_CLIENT, "ResourcesManager")
59  self.config.set(cfg_section, "db_name", sql_name)
60 
61  connectionBuilderLight = ConnectionBuilderLight()
62  self.adminServerFake = connectionBuilderLight.build(tr_consts.SERVER_CONNECT, BaseServerManager.ADMIN_CONNECT_ENDPOINT)
63 
64  self.pollerManager_mock = MagicMock(spec=PollerManager)
65  self.connectionBuilderLight_mock = MagicMock(spec=ConnectionBuilderLight )
66 
67  self.dbi_mock = MagicMock(DBI)
68  self.serverConnection_mock = MagicMock(spec=ConnectionLight)
69  self.resourcesManager_mock = MagicMock(spec=ConnectionLight)
71 
72  self.connectionBuilderLight_mock.build.side_effect = self.connections_mock
73 
76 
77  self.taskId = 1
78  self.newTask = NewTask("ls", self.taskId)
80  self.deleteTask = DeleteTask(0, self.taskId)
82  self.resourceAVGEvent = self.eventBuilder.build(EVENT_TYPES.GET_AVG_RESOURCES_RESPONSE, self.resourceAVG)
83  self.newTaskEvent = self.eventBuilder.build(EVENT_TYPES.SCHEDULE_TASK, self.newTask)
84  self.updateTaskEvent = self.eventBuilder.build(EVENT_TYPES.UPDATE_TASK, self.updateTask)
85  self.deleteTaskEvent = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK, self.deleteTask)
86  self.getScheduledTasksEvent = self.eventBuilder.build(EVENT_TYPES.GET_SCHEDULED_TASKS, GetScheduledTasks(10))
88 
89  self.pollerManager_mock.poll.return_value = [self.scheduler.SERVER]
90 
91 
92  def tearDown(self):
93  self.adminServerFake.close()
94 
95 
96  def check_task_in_dbi(self, task, state_condition=None):
97  schedulerTask = SchedulerTask()
98  schedulerTask.id = task.id
99  lookSchedulerTaskScheme = SchedulerTaskScheme(schedulerTask)
100  resSchedulerTaskScheme = self.scheduler.dbi.fetch(lookSchedulerTaskScheme, "id=%s" % task.id)[0]
101  resTaskLog = resSchedulerTaskScheme._getSchedulerTask()
102 
103  if task.id == resTaskLog.id:
104  if state_condition:
105  if resTaskLog.state == state_condition:
106  return True
107  else:
108  return False
109  return True
110  return False
111 
112 
113  def check_task_not_in_dbi(self, task):
114  schedulerTask = SchedulerTask()
115  schedulerTask.id = task.id
116  lookSchedulerTaskScheme = SchedulerTaskScheme(schedulerTask)
117  resSchedulerTaskScheme = self.scheduler.dbi.fetch(lookSchedulerTaskScheme, "id=%s" % task.id)[0]
118  if resSchedulerTaskScheme == None:
119  return True
120  return False
121 
122 
123  def delete_task_in_dbi(self, task):
124  schedulerTask = SchedulerTask()
125  schedulerTask.id = task.id
126  lookSchedulerTaskScheme = SchedulerTaskScheme(schedulerTask)
127  self.scheduler.dbi.delete(lookSchedulerTaskScheme, "id=%s" % task.id)
128  self.scheduler.checkDBIState()
129 
130 
131  def test_new_task(self):
132  self.serverConnection_mock.recv.return_value = self.newTaskEvent
133 
134  expect_GetAVGResourceEvent = self.eventBuilder.build(EVENT_TYPES.GET_AVG_RESOURCES, None)
135  expect_GetAVGResourceEvent.uid = self.newTaskEvent.uid
136 
137  match_event = Matcher(matchGetAVGResourceEvent, expect_GetAVGResourceEvent)
138 
139  self.scheduler.poll()
140 
141  self.assertEqual(self.scheduler.waitResourcesTasks[self.newTask.id], True, "")
142  self.assertEqual(self.scheduler.waitResourcesEvents[self.newTaskEvent.uid], self.newTaskEvent, "")
143  self.resourcesManager_mock.send.assert_called_once_with(match_event)
144 
145 
147  self.serverConnection_mock.recv.return_value = self.updateTaskEvent
148 
149  expect_GetAVGResourceEvent = self.eventBuilder.build(EVENT_TYPES.GET_AVG_RESOURCES, None)
150  expect_GetAVGResourceEvent.uid = self.updateTaskEvent.uid
151  match_event = Matcher(matchGetAVGResourceEvent, expect_GetAVGResourceEvent)
152 
153  self.scheduler.poll()
154 
155  self.assertEqual(self.scheduler.waitResourcesTasks[self.updateTask.id], True, "")
156  self.assertEqual(self.scheduler.waitResourcesEvents[self.updateTaskEvent.uid], self.updateTaskEvent, "")
157  self.resourcesManager_mock.send.assert_called_once_with(match_event)
158 
159 
161  #add pending task
162  self.scheduler.waitResourcesTasks[self.taskId] = True
163 
164  self.serverConnection_mock.recv.return_value = self.updateTaskEvent
165 
166  self.scheduler.poll()
167 
168  self.assertEqual(self.resourcesManager_mock.send.call_count, 0, "event send to resourceManager")
169  self.serverConnection_mock.send.assert_called_once_with(ANY)
170 
171 
173  #add pending task
174  self.scheduler.waitResourcesTasks[self.taskId] = True
175 
176  self.serverConnection_mock.recv.return_value = self.deleteTaskEvent
177 
178  self.scheduler.poll()
179 
180  self.assertEqual(self.resourcesManager_mock.send.call_count, 0, "event send to resourceManager")
181  self.serverConnection_mock.send.assert_called_once_with(ANY)
182 
183 
185  self.serverConnection_mock.recv.return_value = self.deleteTaskEvent
186 
187  self.scheduler.poll()
188 
189  self.serverConnection_mock.send.assert_called_once_with(ANY)
190 
191 
193  self.scheduler.modifyTaskInSchedule(self.newTask)
194  self.scheduler.checkDBIState()
195 
196  self.serverConnection_mock.recv.return_value = self.deleteTaskEvent
197 
198  self.scheduler.poll()
199 
200  self.serverConnection_mock.send.assert_called_once_with(ANY)
201  self.assertTrue(self.check_task_not_in_dbi(self.newTask), "task is still in schedule")
202 
203  self.delete_task_in_dbi(self.newTask)
204 
205 
207  self.newTaskEvent.connect_name = Scheduler.SERVER
208  self.scheduler.addPendingEvent(self.newTaskEvent)
209  self.resourceAVGEvent.uid = self.newTaskEvent.uid
210 
211  self.serverConnection_mock.recv.return_value = self.resourceAVGEvent
212  match_event = Matcher(matchEventUID, self.newTaskEvent)
213 
214  self.scheduler.poll()
215 
216  self.assertEqual(len(self.scheduler.waitResourcesTasks), 0, "")
217  self.assertEqual(len(self.scheduler.waitResourcesEvents), 0, "")
218  self.assertTrue(self.check_task_in_dbi(self.newTask), "")
219  self.serverConnection_mock.send.assert_called_once_with(match_event)
220 
221  self.delete_task_in_dbi(self.newTask)
222 
223 
225  self.serverConnection_mock.recv.return_value = self.resourceAVGEvent
226  match_event = Matcher(matchEventUID, self.newTaskEvent)
227 
228  self.scheduler.poll()
229 
230 
232  self.updateTaskEvent.connect_name = Scheduler.SERVER
233  self.scheduler.addPendingEvent(self.updateTaskEvent)
234  self.scheduler.modifyTaskInSchedule(self.newTask)
235  self.resourceAVGEvent.uid = self.updateTaskEvent.uid
236 
237  self.serverConnection_mock.recv.return_value = self.resourceAVGEvent
238  match_event = Matcher(matchEventUID, self.updateTaskEvent)
239 
240  self.scheduler.poll()
241 
242  self.assertEqual(len(self.scheduler.waitResourcesTasks), 0, "")
243  self.assertEqual(len(self.scheduler.waitResourcesEvents), 0, "")
244  self.assertTrue(self.check_task_in_dbi(self.updateTask), "")
245  self.serverConnection_mock.send.assert_called_once_with(match_event)
246 
247  self.delete_task_in_dbi(self.newTask)
248 
249 
251  self.newTaskEvent.connect_name = Scheduler.SERVER
252  self.scheduler.addPendingEvent(self.newTaskEvent)
253  self.resourceAVGEvent.uid = self.newTaskEvent.uid
254  self.resourceAVG.cpu = 200
255 
256  self.serverConnection_mock.recv.return_value = self.resourceAVGEvent
257 
258  self.scheduler.poll()
259 
260  self.assertEqual(len(self.scheduler.waitResourcesTasks), 0, "")
261  self.assertEqual(len(self.scheduler.waitResourcesEvents), 0, "")
262  self.assertTrue(self.check_task_not_in_dbi(self.updateTask), "")
263  self.serverConnection_mock.send.assert_called_once_with(ANY)
264 
265 
267  self.newTaskEvent.connect_name = Scheduler.SERVER
268  self.scheduler.addPendingEvent(self.newTaskEvent)
269  self.scheduler.modifyTaskInSchedule(self.newTask)
270  self.resourceAVGEvent.uid = self.newTaskEvent.uid
271 
272  self.serverConnection_mock.recv.return_value = self.resourceAVGEvent
273 
274  self.scheduler.poll()
275 
276  self.assertEqual(len(self.scheduler.waitResourcesTasks), 0, "")
277  self.assertEqual(len(self.scheduler.waitResourcesEvents), 0, "")
278  self.assertTrue(self.check_task_in_dbi(self.updateTask), "")
279  self.serverConnection_mock.send.assert_called_once_with(ANY)
280 
281  self.delete_task_in_dbi(self.newTask)
282 
283 
285  self.updateTaskEvent.connect_name = Scheduler.SERVER
286  self.scheduler.addPendingEvent(self.updateTaskEvent)
287  self.resourceAVGEvent.uid = self.updateTaskEvent.uid
288 
289  self.serverConnection_mock.recv.return_value = self.resourceAVGEvent
290 
291  self.scheduler.poll()
292 
293  self.assertEqual(len(self.scheduler.waitResourcesTasks), 0, "")
294  self.assertEqual(len(self.scheduler.waitResourcesEvents), 0, "")
295  self.serverConnection_mock.send.assert_called_once_with(ANY)
296 
297 
299  self.serverConnection_mock.recv.return_value = self.getScheduledTasksEvent
300  expect_response_event = self.eventBuilder.build(EVENT_TYPES.GET_SCHEDULED_TASKS_RESPONSE, self.getScheduledTasksResponse)
301  match_event = Matcher(matchGetScheduledTasks, expect_response_event)
302 
303  self.scheduler.poll()
304 
305  self.serverConnection_mock.send.assert_called_once_with(match_event)
306 
307 
309  self.serverConnection_mock.recv.return_value = self.getScheduledTasksEvent
310  expect_response_event = self.eventBuilder.build(EVENT_TYPES.GET_SCHEDULED_TASKS_RESPONSE, self.getScheduledTasksResponse)
311  match_event = Matcher(matchGetScheduledTasks, expect_response_event)
312 
313  self.scheduler.dbi.errorCode = dbi_const.DBI_DELETE_ERROR_CODE
314 
315  self.scheduler.poll()
316 
317  self.serverConnection_mock.send.assert_called_once_with(match_event)
318 
319 
321  self.scheduler.modifyTaskInSchedule(self.newTask)
323 
324  self.serverConnection_mock.recv.return_value = self.getScheduledTasksEvent
325  expect_response_event = self.eventBuilder.build(EVENT_TYPES.GET_SCHEDULED_TASKS_RESPONSE, self.getScheduledTasksResponse)
326  match_event = Matcher(matchGetScheduledTasks, expect_response_event)
327 
328  self.scheduler.poll()
329 
330  self.serverConnection_mock.send.assert_called_once_with(match_event)
331  self.check_task_in_dbi(self.newTask, SELECTED_EE)
332 
333  self.delete_task_in_dbi(self.newTask)
334 
335 
337  schedulerTask = SchedulerTask()
338  schedulerTask.id = self.newTask.id
339  schedulerTask.state = SELECTED_EE
340  self.scheduler.dbi.insert(SchedulerTaskScheme(schedulerTask))
341  self.scheduler.checkDBIState()
342 
343  self.serverConnection_mock.recv.return_value = self.getScheduledTasksEvent
344  expect_response_event = self.eventBuilder.build(EVENT_TYPES.GET_SCHEDULED_TASKS_RESPONSE, self.getScheduledTasksResponse)
345  match_event = Matcher(matchGetScheduledTasks, expect_response_event)
346 
347  self.scheduler.poll()
348 
349  self.serverConnection_mock.send.assert_called_once_with(match_event)
350 
351  self.delete_task_in_dbi(self.newTask)
352 
353 
355  tasks = [self.newTask, NewTask("ls", 2), NewTask("ls", 3)]
356  for task in tasks:
357  self.scheduler.modifyTaskInSchedule(task)
358  self.scheduler.checkDBIState()
359 
361  self.serverConnection_mock.recv.return_value = self.getScheduledTasksEvent
362  expect_response_event = self.eventBuilder.build(EVENT_TYPES.GET_SCHEDULED_TASKS_RESPONSE, self.getScheduledTasksResponse)
363  match_event = Matcher(matchGetScheduledTasks, expect_response_event)
364 
365  self.scheduler.poll()
366 
367  self.serverConnection_mock.send.assert_called_once_with(match_event)
368  for task in tasks:
369  self.check_task_in_dbi(task, SELECTED_EE)
370 
371  for task in tasks:
372  self.delete_task_in_dbi(task)
UpdateTask event object, for update task field operation.
NewTask event object, defines the Task object fields.
Class describes structures of task item used in Scheduler.
DeleteTask event object, to delete task from DTM application and from EE.
def matchGetScheduledTasks(first, second)
ResourcesAVG event object, represents summary of the EE resources utilization.
Definition: dbi.py:1
def matchEventUID(first, second)
GetScheduledTasks event object, to get tasks per time slot range from the Scheduler.
Class hides routines of bulding connection objects.
The Scheduler object implements algorithms of tasks scheduling.
Definition: Scheduler.py:52
def matchGetAVGResourceEvent(first, second)
def check_task_in_dbi(self, task, state_condition=None)
GetScheduledTasksResponse event object, to return list of task from the Scheduler.