HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
test_dtm_TasksStateUpdateService.py
Go to the documentation of this file.
1 '''
2 HCE project, Python bindings, Distributed Tasks Manager application.
3 TasksStateUpdateService object functional tests.
4 
5 @package: dtm
6 @author bgv bgv.hce@gmail.com
7 @link: http://hierarchical-cluster-engine.com/
8 @copyright: Copyright © 2013-2014 IOIX Ukraine
9 @license: http://hierarchical-cluster-engine.com/license/
10 @since: 0.1
11 '''
12 
13 import ConfigParser
14 import time
15 import unittest
16 import logging
17 from dtm.TasksStateUpdateService import TasksStateUpdateService
18 from app.BaseServerManager import BaseServerManager
19 from transport.ConnectionBuilderLight import ConnectionBuilderLight
20 from transport.ConnectionBuilder import ConnectionBuilder
21 from transport.IDGenerator import IDGenerator
22 from transport.Connection import ConnectionParams
23 from transport.Request import Request
24 from transport.Event import EventBuilder
25 import transport.Consts as TRANSPORT_CONSTS
26 from drce.CommandConvertor import CommandConvertor
27 import zmq
28 from dtm import Constants as DTM_CONSTS
29 from dtm.EventObjects import UpdateTaskFields, AvailableTaskIds, CheckTaskState
30 
31 FORMAT = '%(asctime)s - %(threadName)s - %(name)s - %(funcName)s - %(levelname)s - %(message)s'
32 logging.basicConfig(level=logging.INFO, format=FORMAT)
33 
34 class TestTasksStateUpdateService(unittest.TestCase):
35 
36  #Test TasksStateUpdateService instantiation
37  CONFIG_SECTION = "TasksStateUpdateService"
38  SERVICE_BIND_IP = "127.0.0.1"
39  SERVICE_BIND_PORT = "5500"
40 
41  def __init__(self, *args, **kvargs):
42  unittest.TestCase.__init__(self, *args, **kvargs)
44 
45  def setUp(self):
46  if not hasattr(self, "inited"):
47  self.inited = True
48 
49  config = ConfigParser.RawConfigParser()
50  config.add_section(self.CONFIG_SECTION)
51  config.set(self.CONFIG_SECTION, "clientTasksManager", "TasksManager")
52  config.set(self.CONFIG_SECTION, "clientExecutionEnvironmentManager","EEManager")
53  config.set(self.CONFIG_SECTION, "serverHost", self.SERVICE_BIND_IP)
54  config.set(self.CONFIG_SECTION, "serverPort", self.SERVICE_BIND_PORT)
55 
56  #Dependent objects creation
57  idGenerator = IDGenerator()
58  connectionBuilderLight = ConnectionBuilderLight()
59  connectionBuilder = ConnectionBuilder(idGenerator)
60  #create AdminServer
61  self.adminServer = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT, BaseServerManager.ADMIN_CONNECT_ENDPOINT)
62  self.taskManagerConnServer = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT, "TasksManager")
63  self.eeManagerServer = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT, "EEManager")
64  # time.sleep(1)
66  self.tsus.pollTimeout = 100
67  # start the threaded server
68  self.tsus.start()
69  self.hceNode = connectionBuilder.build(TRANSPORT_CONSTS.DATA_CONNECT_TYPE, ConnectionParams(self.SERVICE_BIND_IP, self.SERVICE_BIND_PORT), TRANSPORT_CONSTS.CLIENT_CONNECT)
71 
72  def tearDown(self):
73  self.hceNode.close()
74  self.adminServer.close()
75  self.eeManagerServer.close()
76  self.tsus.exit_flag = True
77  time.sleep(0.3)
78  self.taskManagerConnServer.close()
79  time.sleep(0.2)
80  for conn in self.tsus.connections.itervalues():
81  conn.close()
82 
84  req = Request("1")
85  req.add_data('''[{
86  "error_code": 0,
87  "error_message": "",
88  "id": "123",
89  "type": 0,
90  "host": "",
91  "port": 0,
92  "state": 0,
93  "pid": 1186,
94  "stdout": "",
95  "stderror": "",
96  "exit_status": 0,
97  "files": [],
98  "node":"node for test",
99  "time": 0
100  }]
101  ''')
102  self.hceNode.send(req)
103  time.sleep(0.3)
104  #TasksStateUpdateService send UpdateTaskFields message to TaskManager
105  pollResult = self.taskManagerConnServer.poll()
106  self.assertEqual(zmq.POLLIN, pollResult & zmq.POLLIN)
107  event = self.taskManagerConnServer.recv()
108  self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.UPDATE_TASK_FIELDS)
109  self.assertTrue(isinstance(event.eventObj, UpdateTaskFields))
110  self.assertEqual(event.eventObj.id, "123")
111  self.assertEqual(event.eventObj.fields["state"], 0)
112  self.assertEqual(event.eventObj.fields["pId"], 1186)
113  self.assertEqual(event.eventObj.fields["nodeName"], "node for test")
114 
116  req = Request("1")
117  req.add_data('''[{
118  "error_code": 3,
119  "error_message": "error message for test",
120  "id": "1",
121  "type": 0,
122  "host": "",
123  "port": 0,
124  "state": 0,
125  "pid": 1186,
126  "stdout": "",
127  "stderror": "",
128  "exit_status": 0,
129  "files": [],
130  "node":"node for test",
131  "time": 0
132  }]
133  ''')
134  self.hceNode.send(req)
135  time.sleep(0.3)
136  pollResult = self.taskManagerConnServer.poll()
137  #TasksStateService don't send message to TaskManager
138  if pollResult & zmq.POLLIN == zmq.POLLIN:
139  event = self.taskManagerConnServer.recv()
140  self.assertNotEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.UPDATE_TASK_FIELDS)
141 
142  def testCheckState(self):
143  allAvailableTaskIds = ["1","2"]
144  TasksStateUpdateService.FETCH_TASKS_IDS_INTERVAL = 1
145 
146  #test send CheckTaskState
147  time.sleep(TasksStateUpdateService.FETCH_TASKS_IDS_INTERVAL)
148  pollResult = self.taskManagerConnServer.poll(0.1)
149  self.assertEqual(zmq.POLLIN, zmq.POLLIN & pollResult)
150  event = self.taskManagerConnServer.recv()
151  self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.FETCH_AVAILABLE_TASK_IDS)
152  res = AvailableTaskIds(allAvailableTaskIds)
153  responseEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.AVAILABLE_TASK_IDS_RESPONSE, res)
154  responseEvent.connect_identity = event.connect_identity
155  self.taskManagerConnServer.send(responseEvent)
156  time.sleep(0.1)
157 
158  #check the EEManager received the CheckTaskState
159  pollResult = self.eeManagerServer.poll(0.1)
160  self.assertEqual(zmq.POLLIN, zmq.POLLIN & pollResult)
161  event = self.eeManagerServer.recv()
162  self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE)
163  self.assertTrue(isinstance(event.eventObj, CheckTaskState))
164  firstId = event.eventObj.id
165  self.assertIn(firstId, allAvailableTaskIds)
166 
167  #test send CheckTaskState again
168  time.sleep(TasksStateUpdateService.FETCH_TASKS_IDS_INTERVAL)
169  pollResult = self.taskManagerConnServer.poll(0.1)
170  self.assertEqual(zmq.POLLIN, zmq.POLLIN & pollResult)
171  event = self.taskManagerConnServer.recv()
172  self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.FETCH_AVAILABLE_TASK_IDS)
173  res = AvailableTaskIds(allAvailableTaskIds)
174  responseEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.AVAILABLE_TASK_IDS_RESPONSE, res)
175  responseEvent.connect_identity = event.connect_identity
176  self.taskManagerConnServer.send(responseEvent)
177  time.sleep(0.1)
178 
179  pollResult = self.eeManagerServer.poll(0.1)
180  self.assertEqual(zmq.POLLIN, zmq.POLLIN & pollResult)
181  event = self.eeManagerServer.recv()
182  self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE)
183  self.assertTrue(isinstance(event.eventObj, CheckTaskState))
184  secondId = event.eventObj.id
185  self.assertIn(secondId, allAvailableTaskIds)
186  #test the duplicate task avoiding
187  self.assertNotEqual(firstId, secondId)
188 
189  #test send CheckTaskState again, all task sent, then it clear the sent records
190  time.sleep(TasksStateUpdateService.FETCH_TASKS_IDS_INTERVAL)
191  pollResult = self.taskManagerConnServer.poll(0.1)
192  self.assertEqual(zmq.POLLIN, zmq.POLLIN & pollResult)
193  event = self.taskManagerConnServer.recv()
194  self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.FETCH_AVAILABLE_TASK_IDS)
195  res = AvailableTaskIds(allAvailableTaskIds)
196  responseEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.AVAILABLE_TASK_IDS_RESPONSE, res)
197  responseEvent.connect_identity = event.connect_identity
198  self.taskManagerConnServer.send(responseEvent)
199  time.sleep(0.1)
200 
201  #check the EEManager received the CheckTaskState
202  pollResult = self.eeManagerServer.poll(0.1)
203  self.assertEqual(zmq.POLLIN, zmq.POLLIN & pollResult)
204  event = self.eeManagerServer.recv()
205  self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE)
206  self.assertTrue(isinstance(event.eventObj, CheckTaskState))
207  self.assertIn(event.eventObj.id, allAvailableTaskIds)
208 
209 if __name__ == "__main__":
210  unittest.main()
211 
AvailableTaskIds event object, for return all available task id.
The TasksStateUpdateService class, is a listener of tasks state updates from DRCE FO of cluster nodes...
Class hides routines of bulding connection objects.
It's a wrapper similar to zmsg.hpp in sense of encapsulation of hce message structure.
Definition: Request.py:11
The builder is used to encapsulation routine of creation various type of connections.
IDGenerator is used to generate unique id for connections.
Definition: IDGenerator.py:15
Convertor which used to convert Task*Reques to json and TaskResponse from json.