2 HCE project, Python bindings, Distributed Tasks Manager application. 3 TasksStateUpdateService object functional tests. 6 @author bgv bgv.hce@gmail.com 7 @link: http://hierarchical-cluster-engine.com/ 8 @copyright: Copyright © 2013-2014 IOIX Ukraine 9 @license: http://hierarchical-cluster-engine.com/license/ 28 from dtm
import Constants
as DTM_CONSTS
29 from dtm.EventObjects import UpdateTaskFields, AvailableTaskIds, CheckTaskState
31 FORMAT =
'%(asctime)s - %(threadName)s - %(name)s - %(funcName)s - %(levelname)s - %(message)s' 32 logging.basicConfig(level=logging.INFO, format=FORMAT)
37 CONFIG_SECTION =
"TasksStateUpdateService" 38 SERVICE_BIND_IP =
"127.0.0.1" 39 SERVICE_BIND_PORT =
"5500" 42 unittest.TestCase.__init__(self, *args, **kvargs)
46 if not hasattr(self,
"inited"):
49 config = ConfigParser.RawConfigParser()
51 config.set(self.
CONFIG_SECTION,
"clientTasksManager",
"TasksManager")
52 config.set(self.
CONFIG_SECTION,
"clientExecutionEnvironmentManager",
"EEManager")
61 self.
adminServer = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT, BaseServerManager.ADMIN_CONNECT_ENDPOINT)
63 self.
eeManagerServer = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT,
"EEManager")
66 self.
tsus.pollTimeout = 100
76 self.
tsus.exit_flag =
True 80 for conn
in self.
tsus.connections.itervalues():
98 "node":"node for test", 106 self.assertEqual(zmq.POLLIN, pollResult & zmq.POLLIN)
108 self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.UPDATE_TASK_FIELDS)
109 self.assertTrue(isinstance(event.eventObj, UpdateTaskFields))
110 self.assertEqual(event.eventObj.id,
"123")
111 self.assertEqual(event.eventObj.fields[
"state"], 0)
112 self.assertEqual(event.eventObj.fields[
"pId"], 1186)
113 self.assertEqual(event.eventObj.fields[
"nodeName"],
"node for test")
119 "error_message": "error message for test", 130 "node":"node for test", 138 if pollResult & zmq.POLLIN == zmq.POLLIN:
140 self.assertNotEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.UPDATE_TASK_FIELDS)
143 allAvailableTaskIds = [
"1",
"2"]
144 TasksStateUpdateService.FETCH_TASKS_IDS_INTERVAL = 1
147 time.sleep(TasksStateUpdateService.FETCH_TASKS_IDS_INTERVAL)
149 self.assertEqual(zmq.POLLIN, zmq.POLLIN & pollResult)
151 self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.FETCH_AVAILABLE_TASK_IDS)
153 responseEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.AVAILABLE_TASK_IDS_RESPONSE, res)
154 responseEvent.connect_identity = event.connect_identity
160 self.assertEqual(zmq.POLLIN, zmq.POLLIN & pollResult)
162 self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE)
163 self.assertTrue(isinstance(event.eventObj, CheckTaskState))
164 firstId = event.eventObj.id
165 self.assertIn(firstId, allAvailableTaskIds)
168 time.sleep(TasksStateUpdateService.FETCH_TASKS_IDS_INTERVAL)
170 self.assertEqual(zmq.POLLIN, zmq.POLLIN & pollResult)
172 self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.FETCH_AVAILABLE_TASK_IDS)
174 responseEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.AVAILABLE_TASK_IDS_RESPONSE, res)
175 responseEvent.connect_identity = event.connect_identity
180 self.assertEqual(zmq.POLLIN, zmq.POLLIN & pollResult)
182 self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE)
183 self.assertTrue(isinstance(event.eventObj, CheckTaskState))
184 secondId = event.eventObj.id
185 self.assertIn(secondId, allAvailableTaskIds)
187 self.assertNotEqual(firstId, secondId)
190 time.sleep(TasksStateUpdateService.FETCH_TASKS_IDS_INTERVAL)
192 self.assertEqual(zmq.POLLIN, zmq.POLLIN & pollResult)
194 self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.FETCH_AVAILABLE_TASK_IDS)
196 responseEvent = self.
eventBuilder.build(DTM_CONSTS.EVENT_TYPES.AVAILABLE_TASK_IDS_RESPONSE, res)
197 responseEvent.connect_identity = event.connect_identity
203 self.assertEqual(zmq.POLLIN, zmq.POLLIN & pollResult)
205 self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE)
206 self.assertTrue(isinstance(event.eventObj, CheckTaskState))
207 self.assertIn(event.eventObj.id, allAvailableTaskIds)
209 if __name__ ==
"__main__":
AvailableTaskIds event object, for return all available task id.
def testUpdateStateError(self)
The TasksStateUpdateService class, is a listener of tasks state updates from DRCE FO of cluster nodes...
Class hides routines of bulding connection objects.
It's a wrapper similar to zmsg.hpp in sense of encapsulation of hce message structure.
The builder is used to encapsulation routine of creation various type of connections.
IDGenerator is used to generate unique id for connections.
Convertor which used to convert Task*Reques to json and TaskResponse from json.
def testUpdateStateNomal(self)
def __init__(self, args, kvargs)