HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
test_dtm_TasksManager.py
Go to the documentation of this file.
1 '''
2 Created on Mar 11, 2014
3 
4 @author: igor
5 '''
6 import unittest
7 from mock import MagicMock, call, ANY
8 import ConfigParser
9 import tempfile
10 import os
11 from app.BaseServerManager import BaseServerManager
12 from dtm.TasksManager import TasksManager, DBIErr
13 from app.PollerManager import PollerManager
14 from transport.ConnectionBuilderLight import ConnectionBuilderLight
15 from transport.ConnectionLight import ConnectionLight
16 from transport.Event import EventBuilder
17 from dtm.EventObjects import NewTask, UpdateTaskFields, UpdateTask, DeleteTask, EEResponseData
18 from dtm.EventObjects import FetchTasksResultsFromCache, GetTasksStatus, GetTaskManagerFields, DeleteTaskData
19 from dtm.EventObjects import GeneralResponse
20 from dtm.EventObjects import FetchAvailabelTaskIds, AvailableTaskIds
21 from dtm.Constants import EVENT_TYPES
22 from dtm.TaskBackLogScheme import TaskBackLogScheme
23 from dtm.TaskLogScheme import TaskLogScheme
24 from dtm.TaskLog import TaskLog
25 from dbi.dbi import DBI
26 import transport.Consts as tr_consts
27 
28 import logging
29 FORMAT = '%(asctime)s - %(thread)ld - %(threadName)s - %(name)s - %(funcName)s - %(levelname)s - %(message)s'
30 logging.basicConfig(level=logging.DEBUG, format=FORMAT) #basic logging configuration
31 
32 #import logging
33 #logging.basicConfig(level=logging.DEBUG) #basic logging configuration
34 
35 
36 
37 class Matcher(object):
38 
39  def __init__(self, compare, some_obj):
40  self.compare = compare
41  self.some_obj = some_obj
42 
43  def __eq__(self, other):
44  return self.compare(self.some_obj, other)
45 
46 
47 def matchShedulerNewEvents(first, second):
48  files = first.eventObj.files
49  first.eventObj.files = None
50  ret = False
51  if first.eventObj.__dict__ == second.eventObj.__dict__ and first.eventType == second.eventType:
52  ret = True
53  first.eventObj.files = files
54  return ret
55 
56 
57 def matchDeleteTasksEvent(first, second):
58  return first.eventType == second.eventType and first.eventObj.id == second.eventObj.id
59 
60 
61 class TestTasksManager(unittest.TestCase):
62 
63 
64  def setUp(self):
65  self.config = ConfigParser.RawConfigParser()
66  cfg_section = "TasksManager"
67 
68  tf = tempfile.NamedTemporaryFile()
69  sql_name = "sqlite:///" + os.path.basename(tf.name) + ".db"
70 
71  self.config.add_section(cfg_section)
72  self.config.set(cfg_section, TasksManager.SERVER, "TasksManager")
73  self.config.set(cfg_section, TasksManager.TASKS_DATA_MANAGER_CLIENT, "TasksDataManager")
74  self.config.set(cfg_section, TasksManager.SCHEDULER_CLIENT, "Scheduler")
75  self.config.set(cfg_section, "db_name", sql_name)
76 
77  connectionBuilderLight = ConnectionBuilderLight()
78  self.adminServerFake = connectionBuilderLight.build(tr_consts.SERVER_CONNECT, BaseServerManager.ADMIN_CONNECT_ENDPOINT)
79 
80  self.pollerManager_mock = MagicMock(spec=PollerManager)
81  self.connectionBuilderLight_mock = MagicMock(spec=ConnectionBuilderLight )
82 
83  self.dbi_mock = MagicMock(DBI)
84  self.serverConnection_mock = MagicMock(spec=ConnectionLight)
85  self.tasksDataManager_mock = MagicMock(spec=ConnectionLight)
86  self.scheduler_mock = MagicMock(spec=ConnectionLight)
88 
89  self.connectionBuilderLight_mock.build.side_effect = self.connections_mock
90 
93 
94  self.taskId = 1
95  self.newTask = NewTask("ls", self.taskId)
100 
101 
102  def tearDown(self):
103  lookBackTaskLogScheme = TaskBackLogScheme(self.newTask)
104  self.tasksManager.dbi.delete(lookBackTaskLogScheme, "id=%s" % self.newTask.id)
105  self.adminServerFake.close()
106 
107 
108  def check_task_in_dbi(self, task, full_compare=True):
109  lookBackTaskLogScheme = TaskBackLogScheme(task)
110  resBackTaskLogScheme = self.tasksManager.dbi.fetch(lookBackTaskLogScheme, "id=%s" % task.id)[0]
111  resTaskLog = resBackTaskLogScheme._getTaskLog()
112 
113  if not full_compare:
114  if task.id == resTaskLog.id:
115  return True
116  else:
117  return False
118 
119  if task.__dict__ == resTaskLog.__dict__:
120  return True
121  return False
122 
123 
124  def test_add_new_task(self):
125  newTaskEvent = self.eventBuilder.build(EVENT_TYPES.NEW_TASK, self.newTask)
126  self.pollerManager_mock.poll.return_value = [self.tasksManager.SERVER]
127  self.serverConnection_mock.recv.return_value = newTaskEvent
128 
129  match_event = Matcher(matchShedulerNewEvents, newTaskEvent)
130 
131  self.tasksManager.poll()
132 
133  self.assertEqual(len(self.tasksManager.tasksQueue), 0, "task is in tasksQueue")
134  self.assertEqual(len(self.tasksManager.pendingTasks), 1, "")
135  self.tasksDataManager_mock.send.assert_called_once_with(match_event)
136 
137 
138 
140 
141  newTaskEvent = self.eventBuilder.build(EVENT_TYPES.SCHEDULE_TASK, self.newTask)
142  self.tasksManager.onNewTask(newTaskEvent)
143 
144  generalResponseEvent = self.eventBuilder.build(EVENT_TYPES.NEW_TASK_RESPONSE, self.generalResponse)
145  generalResponseEvent.uid = newTaskEvent.uid
146  self.pollerManager_mock.poll.return_value = [self.tasksManager.SERVER]
147  self.serverConnection_mock.recv.return_value = generalResponseEvent
148 
149  match_event = Matcher(matchShedulerNewEvents, newTaskEvent)
150  taskLog = self.tasksManager.createTaskLog(self.newTask)
151  self.tasksManager.poll()
152 
153  self.assertEqual(len(self.tasksManager.tasksQueue), 1, "task isnot in tasksQueue")
154  self.assertEqual(len(self.tasksManager.pendingTasks), 1, "")
155  self.assertTrue(self.check_task_in_dbi(taskLog, False), "insertion in dbi is broken")
156  self.scheduler_mock.send.assert_called_once_with(match_event)
157 
158 
160  req = FetchAvailabelTaskIds(100)
161  event = self.eventBuilder.build(EVENT_TYPES.FETCH_AVAILABLE_TASK_IDS, req)
162  event.connect_identity = "333"
163  taskLog1 = TaskLog()
164  taskLog1.id = "1"
165  taskLog2 = TaskLog()
166  taskLog2.id = "2"
167  taskLog3 = TaskLog()
168  taskLog3.id = "3"
169  self.tasksManager.dbi = self.dbi_mock
170  self.dbi_mock.sql.return_value = [TaskLogScheme(taskLog1), TaskLogScheme(taskLog2), TaskLogScheme(taskLog3)]
171  event.connect_name = TasksManager.SERVER
172  self.tasksManager.onFetchAvailableTasks(event)
173  args = self.serverConnection_mock.send.call_args
174  self.assertEqual(args[0][0].eventObj.ids, ["1", "2", "3"])
175 
177 
178  newTaskEvent = self.eventBuilder.build(EVENT_TYPES.NEW_TASK, self.newTask)
179  newTaskEvent.connect_name = "server"
180  self.tasksManager.onNewTask(newTaskEvent)
181 
182  self.generalResponse.errorCode = self.generalResponse.ERROR_OK + 1
183  generalResponseEvent = self.eventBuilder.build(EVENT_TYPES.NEW_TASK_RESPONSE, self.generalResponse)
184  generalResponseEvent.uid = newTaskEvent.uid
185  self.pollerManager_mock.poll.return_value = [self.tasksManager.SERVER]
186  self.serverConnection_mock.recv.return_value = generalResponseEvent
187 
188  self.tasksManager.addConnection("server", self.serverConnection_mock)
189 
190  self.tasksManager.poll()
191 
192  self.assertEqual(len(self.tasksManager.tasksQueue), 0, "task is in tasksQueue")
193  self.assertEqual(len(self.tasksManager.pendingTasks), 0, "")
194  self.serverConnection_mock.send.assert_called_once_with(ANY) #send response
195  self.assertEqual(self.scheduler_mock.send.call_count, 0, "")
196 
197 
198 
200 
201  newTaskEvent = self.eventBuilder.build(EVENT_TYPES.NEW_TASK, self.newTask)
202  newTaskEvent.connect_name = "server"
203  self.tasksManager.onNewTask(newTaskEvent)
204 
205  generalResponseEvent = self.eventBuilder.build(EVENT_TYPES.NEW_TASK_RESPONSE, self.generalResponse)
206  generalResponseEvent.uid = newTaskEvent.uid
207  self.pollerManager_mock.poll.return_value = [self.tasksManager.SERVER]
208  self.serverConnection_mock.recv.return_value = generalResponseEvent
209 
210  self.tasksManager.addConnection("server", self.serverConnection_mock)
211 
212  self.tasksManager.tasksQueue[self.newTask.id] = self.newTask
213 
214  self.tasksManager.dbi = self.dbi_mock
215  self.dbi_mock.insert.side_effect = DBIErr(11, "some err")
216 
217  #self.tasksManager.poll()
218  deleteTaskData = DeleteTaskData(self.taskId)
219  deleteEvent = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK_DATA, deleteTaskData)
220 
221  match_event = Matcher(matchDeleteTasksEvent, deleteEvent)
222 
223  self.tasksManager.poll()
224 
225  self.assertEqual(len(self.tasksManager.tasksQueue), 0, "task is in tasksQueue")
226  self.assertEqual(len(self.tasksManager.pendingTasks), 0, "")
227  self.tasksDataManager_mock.send.assert_called_with(match_event)
228  self.serverConnection_mock.send.assert_called_once_with(ANY) #send response
229 
230 
232 
233  newTaskEvent = self.eventBuilder.build(EVENT_TYPES.NEW_TASK, self.newTask)
234  newTaskEvent.connect_name = "server"
235  self.tasksManager.onNewTask(newTaskEvent)
236 
237  generalResponseEvent = self.eventBuilder.build(EVENT_TYPES.NEW_TASK_RESPONSE, self.generalResponse)
238  generalResponseEvent.uid = newTaskEvent.uid
239  self.tasksManager.onTasksManagerGeneralResponse(generalResponseEvent)
240 
241  self.pollerManager_mock.poll.return_value = [self.tasksManager.SERVER]
242  generalResponseErr = GeneralResponse(GeneralResponse.ERROR_OK + 1, "")
243  generalResponseErrEvent = self.eventBuilder.build(EVENT_TYPES.SCHEDULE_TASK_RESPONSE, generalResponseErr)
244  generalResponseErrEvent.uid = newTaskEvent.uid
245  self.serverConnection_mock.recv.return_value = generalResponseErrEvent
246 
247  self.tasksManager.addConnection("server", self.serverConnection_mock)
248 
249  self.tasksManager.poll()
250 
251  self.assertEqual(len(self.tasksManager.tasksQueue), 0, "task is in tasksQueue")
252  self.assertEqual(len(self.tasksManager.pendingTasks), 0, "")
253  self.serverConnection_mock.send.assert_called_once_with(ANY) #send response
254 
255 
257 
258  newTaskEvent = self.eventBuilder.build(EVENT_TYPES.NEW_TASK, self.newTask)
259  newTaskEvent.connect_name = "server"
260  self.tasksManager.onNewTask(newTaskEvent)
261 
262  generalResponseEvent = self.eventBuilder.build(EVENT_TYPES.SCHEDULE_TASK_RESPONSE, self.generalResponse)
263  generalResponseEvent.uid = newTaskEvent.uid
264  self.tasksManager.onTasksManagerGeneralResponse(generalResponseEvent)
265 
266  self.pollerManager_mock.poll.return_value = [self.tasksManager.SERVER]
267  self.serverConnection_mock.recv.return_value = generalResponseEvent
268 
269  self.tasksManager.addConnection("server", self.serverConnection_mock)
270  taskLog = self.tasksManager.createTaskLog(self.newTask)
271 
272  self.tasksManager.poll()
273 
274  self.assertEqual(len(self.tasksManager.tasksQueue), 1, "task is in tasksQueue")
275  self.assertEqual(len(self.tasksManager.pendingTasks), 0, "")
276  self.serverConnection_mock.send.assert_called_once_with(ANY) #send response
277  self.assertTrue(self.check_task_in_dbi(taskLog, False), "insertion in dbi is broken")
278 
279 
280  def test_update_task(self):
281  #add task
282  self.tasksManager.tasksQueue[self.newTask.id] = self.newTask
283 
284  updateTaskEvent = self.eventBuilder.build(EVENT_TYPES.UPDATE_TASK, self.updateTask)
285  self.pollerManager_mock.poll.return_value = [self.tasksManager.SERVER]
286  self.serverConnection_mock.recv.return_value = updateTaskEvent
287 
288  match_event = Matcher(matchShedulerNewEvents, updateTaskEvent)
289  self.tasksManager.poll()
290 
291  self.assertEqual(len(self.tasksManager.tasksQueue), 1, "task isnt in tasksQueue")
292  self.tasksDataManager_mock.send.assert_called_once_with(updateTaskEvent)
293  self.scheduler_mock.send.assert_called_once_with(match_event)
294  self.serverConnection_mock.send.assert_called_once_with(ANY)
295 
296 
298  updateTaskEvent = self.eventBuilder.build(EVENT_TYPES.UPDATE_TASK, self.updateTask)
299  self.pollerManager_mock.poll.return_value = [self.tasksManager.SERVER]
300  self.serverConnection_mock.recv.return_value = updateTaskEvent
301 
302  self.tasksManager.poll()
303 
304  self.assertEqual(self.tasksDataManager_mock.send.call_count, 0, "event send to tasksDataManager")
305  self.assertEqual(self.scheduler_mock.send.call_count, 0, "event send to scheduler")
306  self.serverConnection_mock.send.assert_called_once_with(ANY)
307 
308 
309  def test_delete_tasks(self):
310  #add task
311  newTaskEvent = self.eventBuilder.build(EVENT_TYPES.NEW_TASK, self.newTask)
312  self.tasksManager.addNewTaskData(newTaskEvent)
313 
314  print self.check_task_in_dbi(self.newTask, False)
315  #self.tasksManager.tasksQueue[self.newTask.id] = self.newTask
316 
317  deleteTaskEvent = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK, self.deleteTasks)
318  self.pollerManager_mock.poll.return_value = [self.tasksManager.SERVER]
319  self.serverConnection_mock.recv.return_value = deleteTaskEvent
320 
321  sendTaskEvent = self.eventBuilder.build(EVENT_TYPES.NEW_TASK, self.deleteTasks)
322  match_event = Matcher(matchDeleteTasksEvent, sendTaskEvent)
323 
324  self.tasksManager.poll()
325 
326  self.assertEqual(len(self.tasksManager.tasksQueue), 1, "task is in tasksQueue")
327  self.tasksDataManager_mock.send.assert_called_once_with(match_event)
328  self.assertEqual(self.scheduler_mock.send.call_count, 0, "")
329  self.assertEqual(self.serverConnection_mock.send.call_count, 0, "")
330 
331 
333  deleteTaskEvent = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK, self.deleteTasks)
334  self.pollerManager_mock.poll.return_value = [self.tasksManager.SERVER]
335  self.serverConnection_mock.recv.return_value = deleteTaskEvent
336 
337  self.tasksManager.poll()
338 
339  self.serverConnection_mock.send.assert_called_once_with(ANY)
340 
341 
343  #add task
344  self.tasksManager.tasksQueue[self.newTask.id] = self.newTask
345 
346  deleteTaskEvent = self.eventBuilder.build(EVENT_TYPES.DELETE_TASK, self.deleteTasks)
347  self.pollerManager_mock.poll.return_value = [self.tasksManager.SERVER]
348  self.serverConnection_mock.recv.return_value = deleteTaskEvent
349 
350  old_dbi = self.tasksManager.dbi
351  self.tasksManager.dbi = self.dbi_mock
352  self.dbi_mock.delete.side_effect = DBIErr(11, "some err")
353 
354  match_event = Matcher(matchDeleteTasksEvent, deleteTaskEvent)
355 
356  self.tasksManager.poll()
357 
358  self.serverConnection_mock.send.assert_called_once_with(ANY)
359 
360  self.tasksManager.dbi = old_dbi
361 
362 
364  #add task
365  self.tasksManager.tasksQueue[self.newTask.id] = self.newTask
366 
367  fetchTasksResultsFromCacheEvent = self.eventBuilder.build(EVENT_TYPES.FETCH_RESULTS_CACHE, self.fetchTasksResultsFromCache)
368  self.pollerManager_mock.poll.return_value = [self.tasksManager.SERVER]
369  self.serverConnection_mock.recv.return_value = fetchTasksResultsFromCacheEvent
370 
371  self.tasksManager.poll()
372 
373  self.assertEqual(len(self.tasksManager.fetchEvents), 1, "")
374  self.tasksDataManager_mock.send.assert_called_once_with(fetchTasksResultsFromCacheEvent)
375 
376 
378  fetchTasksResultsFromCacheEvent = self.eventBuilder.build(EVENT_TYPES.FETCH_RESULTS_CACHE, self.fetchTasksResultsFromCache)
379  self.pollerManager_mock.poll.return_value = [self.tasksManager.SERVER]
380  self.serverConnection_mock.recv.return_value = fetchTasksResultsFromCacheEvent
381 
382  self.tasksManager.poll()
383 
384  self.assertEqual(len(self.tasksManager.fetchEvents), 0, "")
385  self.assertEqual(self.tasksDataManager_mock.send.call_count, 0, "event send to tasksDataManager")
386  self.serverConnection_mock.send.assert_called_once_with(ANY)
387 
388 
390  getTasksStatus = GetTasksStatus([self.taskId])
391  getTasksStatusEvent = self.eventBuilder.build(EVENT_TYPES.GET_TASK_STATUS, getTasksStatus)
392  self.pollerManager_mock.poll.return_value = [self.tasksManager.SERVER]
393  self.serverConnection_mock.recv.return_value = getTasksStatusEvent
394 
395  self.tasksManager.poll()
396  self.serverConnection_mock.send.assert_called_once_with(ANY)
397 
398 
400  getTaskManagerFields = GetTaskManagerFields(self.taskId)
401  getTaskManagerFieldsEvent = self.eventBuilder.build(EVENT_TYPES.GET_TASK_FIELDS, getTaskManagerFields)
402  self.pollerManager_mock.poll.return_value = [self.tasksManager.SERVER]
403  self.serverConnection_mock.recv.return_value = getTaskManagerFieldsEvent
404 
405  self.tasksManager.poll()
406  self.serverConnection_mock.send.assert_called_once_with(ANY)
407 
408 
410  updateTaskFields = UpdateTaskFields(self.taskId)
411  updateTaskFieldsEvent = self.eventBuilder.build(EVENT_TYPES.UPDATE_TASK_FIELDS, updateTaskFields)
412  self.pollerManager_mock.poll.return_value = [self.tasksManager.SERVER]
413  self.serverConnection_mock.recv.return_value = updateTaskFieldsEvent
414 
415  self.tasksManager.poll()
416 
417 
419  event = self.eventBuilder.build(EVENT_TYPES.FETCH_RESULTS_CACHE, "")
420  event.uid = "2"
421  event.connect_name = self.tasksManager.SERVER
422  self.tasksManager.fetchEvents[event.uid] = event
423 
424  eeResponseData = EEResponseData(self.taskId)
425  eeResponseDataEvent = self.eventBuilder.build(EVENT_TYPES.FETCH_TASK_RESULTS_RESPONSE, eeResponseData)
426  self.pollerManager_mock.poll.return_value = [self.tasksManager.SERVER]
427  self.serverConnection_mock.recv.return_value = eeResponseDataEvent
428 
429  self.tasksManager.poll()
430 
431  self.assertEqual(len(self.tasksManager.fetchEvents), 0, "event is still here")
432  self.serverConnection_mock.send.assert_called_once_with(ANY)
433 
434 
436  eeResponseData = EEResponseData(self.taskId)
437  eeResponseDataEvent = self.eventBuilder.build(EVENT_TYPES.FETCH_TASK_RESULTS_RESPONSE, eeResponseData)
438  self.pollerManager_mock.poll.return_value = [self.tasksManager.SERVER]
439  self.serverConnection_mock.recv.return_value = eeResponseDataEvent
440 
441  self.tasksManager.poll()
442 
443  self.assertEqual(len(self.tasksManager.fetchEvents), 0, "event is still here")
444  self.assertEqual(self.serverConnection_mock.send.call_count, 0, "send reply")
445 
446 
448  #add task
449  newTaskEvent = self.eventBuilder.build(EVENT_TYPES.NEW_TASK, self.newTask)
450  self.tasksManager.addNewTaskData(newTaskEvent)
451 
452  self.check_task_in_dbi(self.newTask, False)
453 
454  updateTaskFields = UpdateTaskFields(self.taskId)
455  updateTaskFields.fields["state"] = EEResponseData.TASK_STATE_TERMINATED
456 
457  updateTaskEvent = self.eventBuilder.build(EVENT_TYPES.UPDATE_TASK_FIELDS, updateTaskFields)
458  self.pollerManager_mock.poll.return_value = [self.tasksManager.SERVER]
459  self.serverConnection_mock.recv.return_value = updateTaskEvent
460 
461  self.tasksManager.poll()
462 
463  self.assertEqual(len(self.tasksManager.tasksQueue), 0, "task is in tasksQueue")
464  self.assertEquals(self.tasksDataManager_mock.send.call_count, 2, "")
465 
466  lookBackTaskLogScheme = TaskBackLogScheme(self.newTask)
467  resBackTaskLogScheme = self.tasksManager.dbi.fetch(lookBackTaskLogScheme, "id=%s" % self.taskId)[0]
468  assert resBackTaskLogScheme is None
469 
470  lookTaskLogScheme = TaskLogScheme(self.newTask)
471  resTaskLogScheme = self.tasksManager.dbi.fetch(lookTaskLogScheme, "id=%s" % self.taskId)[0]
472  assert resTaskLogScheme is not None
473 
474 if __name__ == '__main__':
475  unittest.main()
UpdateTask event object, for update task field operation.
UpdateTaskFields event object, for update task fields operation.
NewTask event object, defines the Task object fields.
GeneralResponse event object, represents general state response for multipurpose usage.
def matchDeleteTasksEvent(first, second)
FetchTasksResultsFromCache event object, for fetch task's results data from DTM application.
Class describes structures of task item used in TaskManager.
Definition: TaskLog.py:10
DeleteTaskData event object, to delete task's data in the storage.
DeleteTask event object, to delete task from DTM application and from EE.
Definition: dbi.py:1
GetTaskManagerFields event object, for get task fields values operation.
Class hides routines of bulding connection objects.
def __init__(self, compare, some_obj)
def check_task_in_dbi(self, task, full_compare=True)
GetTasksStatus event object, for check task status operation.
def matchShedulerNewEvents(first, second)
FetchAvailabelTaskIds event object, for fetch available task id.
EEResponseData event object, store task results data, returned from EE.