HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
tests.test_dtm_TasksStateUpdateService.TestTasksStateUpdateService Class Reference
Inheritance diagram for tests.test_dtm_TasksStateUpdateService.TestTasksStateUpdateService:
Collaboration diagram for tests.test_dtm_TasksStateUpdateService.TestTasksStateUpdateService:

Public Member Functions

def __init__ (self, args, kvargs)
 
def setUp (self)
 
def tearDown (self)
 
def testUpdateStateNomal (self)
 
def testUpdateStateError (self)
 
def testCheckState (self)
 

Public Attributes

 eventBuilder
 
 inited
 
 adminServer
 
 taskManagerConnServer
 
 eeManagerServer
 
 tsus
 
 hceNode
 
 drceCommandConvertor
 

Static Public Attributes

string CONFIG_SECTION = "TasksStateUpdateService"
 
string SERVICE_BIND_IP = "127.0.0.1"
 
string SERVICE_BIND_PORT = "5500"
 

Detailed Description

Definition at line 34 of file test_dtm_TasksStateUpdateService.py.

Constructor & Destructor Documentation

◆ __init__()

def tests.test_dtm_TasksStateUpdateService.TestTasksStateUpdateService.__init__ (   self,
  args,
  kvargs 
)

Definition at line 41 of file test_dtm_TasksStateUpdateService.py.

41  def __init__(self, *args, **kvargs):
42  unittest.TestCase.__init__(self, *args, **kvargs)
43  self.eventBuilder = EventBuilder()
44 
def __init__(self)
constructor
Definition: UIDGenerator.py:19

Member Function Documentation

◆ setUp()

def tests.test_dtm_TasksStateUpdateService.TestTasksStateUpdateService.setUp (   self)

Definition at line 45 of file test_dtm_TasksStateUpdateService.py.

45  def setUp(self):
46  if not hasattr(self, "inited"):
47  self.inited = True
48 
49  config = ConfigParser.RawConfigParser()
50  config.add_section(self.CONFIG_SECTION)
51  config.set(self.CONFIG_SECTION, "clientTasksManager", "TasksManager")
52  config.set(self.CONFIG_SECTION, "clientExecutionEnvironmentManager","EEManager")
53  config.set(self.CONFIG_SECTION, "serverHost", self.SERVICE_BIND_IP)
54  config.set(self.CONFIG_SECTION, "serverPort", self.SERVICE_BIND_PORT)
55 
56  #Dependent objects creation
57  idGenerator = IDGenerator()
58  connectionBuilderLight = ConnectionBuilderLight()
59  connectionBuilder = ConnectionBuilder(idGenerator)
60  #create AdminServer
61  self.adminServer = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT, BaseServerManager.ADMIN_CONNECT_ENDPOINT)
62  self.taskManagerConnServer = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT, "TasksManager")
63  self.eeManagerServer = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT, "EEManager")
64  # time.sleep(1)
65  self.tsus = TasksStateUpdateService(config)
66  self.tsus.pollTimeout = 100
67  # start the threaded server
68  self.tsus.start()
69  self.hceNode = connectionBuilder.build(TRANSPORT_CONSTS.DATA_CONNECT_TYPE, ConnectionParams(self.SERVICE_BIND_IP, self.SERVICE_BIND_PORT), TRANSPORT_CONSTS.CLIENT_CONNECT)
70  self.drceCommandConvertor = CommandConvertor()
71 

◆ tearDown()

def tests.test_dtm_TasksStateUpdateService.TestTasksStateUpdateService.tearDown (   self)

Definition at line 72 of file test_dtm_TasksStateUpdateService.py.

72  def tearDown(self):
73  self.hceNode.close()
74  self.adminServer.close()
75  self.eeManagerServer.close()
76  self.tsus.exit_flag = True
77  time.sleep(0.3)
78  self.taskManagerConnServer.close()
79  time.sleep(0.2)
80  for conn in self.tsus.connections.itervalues():
81  conn.close()
82 

◆ testCheckState()

def tests.test_dtm_TasksStateUpdateService.TestTasksStateUpdateService.testCheckState (   self)

Definition at line 142 of file test_dtm_TasksStateUpdateService.py.

142  def testCheckState(self):
143  allAvailableTaskIds = ["1","2"]
144  TasksStateUpdateService.FETCH_TASKS_IDS_INTERVAL = 1
145 
146  #test send CheckTaskState
147  time.sleep(TasksStateUpdateService.FETCH_TASKS_IDS_INTERVAL)
148  pollResult = self.taskManagerConnServer.poll(0.1)
149  self.assertEqual(zmq.POLLIN, zmq.POLLIN & pollResult)
150  event = self.taskManagerConnServer.recv()
151  self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.FETCH_AVAILABLE_TASK_IDS)
152  res = AvailableTaskIds(allAvailableTaskIds)
153  responseEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.AVAILABLE_TASK_IDS_RESPONSE, res)
154  responseEvent.connect_identity = event.connect_identity
155  self.taskManagerConnServer.send(responseEvent)
156  time.sleep(0.1)
157 
158  #check the EEManager received the CheckTaskState
159  pollResult = self.eeManagerServer.poll(0.1)
160  self.assertEqual(zmq.POLLIN, zmq.POLLIN & pollResult)
161  event = self.eeManagerServer.recv()
162  self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE)
163  self.assertTrue(isinstance(event.eventObj, CheckTaskState))
164  firstId = event.eventObj.id
165  self.assertIn(firstId, allAvailableTaskIds)
166 
167  #test send CheckTaskState again
168  time.sleep(TasksStateUpdateService.FETCH_TASKS_IDS_INTERVAL)
169  pollResult = self.taskManagerConnServer.poll(0.1)
170  self.assertEqual(zmq.POLLIN, zmq.POLLIN & pollResult)
171  event = self.taskManagerConnServer.recv()
172  self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.FETCH_AVAILABLE_TASK_IDS)
173  res = AvailableTaskIds(allAvailableTaskIds)
174  responseEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.AVAILABLE_TASK_IDS_RESPONSE, res)
175  responseEvent.connect_identity = event.connect_identity
176  self.taskManagerConnServer.send(responseEvent)
177  time.sleep(0.1)
178 
179  pollResult = self.eeManagerServer.poll(0.1)
180  self.assertEqual(zmq.POLLIN, zmq.POLLIN & pollResult)
181  event = self.eeManagerServer.recv()
182  self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE)
183  self.assertTrue(isinstance(event.eventObj, CheckTaskState))
184  secondId = event.eventObj.id
185  self.assertIn(secondId, allAvailableTaskIds)
186  #test the duplicate task avoiding
187  self.assertNotEqual(firstId, secondId)
188 
189  #test send CheckTaskState again, all task sent, then it clear the sent records
190  time.sleep(TasksStateUpdateService.FETCH_TASKS_IDS_INTERVAL)
191  pollResult = self.taskManagerConnServer.poll(0.1)
192  self.assertEqual(zmq.POLLIN, zmq.POLLIN & pollResult)
193  event = self.taskManagerConnServer.recv()
194  self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.FETCH_AVAILABLE_TASK_IDS)
195  res = AvailableTaskIds(allAvailableTaskIds)
196  responseEvent = self.eventBuilder.build(DTM_CONSTS.EVENT_TYPES.AVAILABLE_TASK_IDS_RESPONSE, res)
197  responseEvent.connect_identity = event.connect_identity
198  self.taskManagerConnServer.send(responseEvent)
199  time.sleep(0.1)
200 
201  #check the EEManager received the CheckTaskState
202  pollResult = self.eeManagerServer.poll(0.1)
203  self.assertEqual(zmq.POLLIN, zmq.POLLIN & pollResult)
204  event = self.eeManagerServer.recv()
205  self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.CHECK_TASK_STATE)
206  self.assertTrue(isinstance(event.eventObj, CheckTaskState))
207  self.assertIn(event.eventObj.id, allAvailableTaskIds)
208 

◆ testUpdateStateError()

def tests.test_dtm_TasksStateUpdateService.TestTasksStateUpdateService.testUpdateStateError (   self)

Definition at line 115 of file test_dtm_TasksStateUpdateService.py.

115  def testUpdateStateError(self):
116  req = Request("1")
117  req.add_data('''[{
118  "error_code": 3,
119  "error_message": "error message for test",
120  "id": "1",
121  "type": 0,
122  "host": "",
123  "port": 0,
124  "state": 0,
125  "pid": 1186,
126  "stdout": "",
127  "stderror": "",
128  "exit_status": 0,
129  "files": [],
130  "node":"node for test",
131  "time": 0
132  }]
133  ''')
134  self.hceNode.send(req)
135  time.sleep(0.3)
136  pollResult = self.taskManagerConnServer.poll()
137  #TasksStateService don't send message to TaskManager
138  if pollResult & zmq.POLLIN == zmq.POLLIN:
139  event = self.taskManagerConnServer.recv()
140  self.assertNotEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.UPDATE_TASK_FIELDS)
141 

◆ testUpdateStateNomal()

def tests.test_dtm_TasksStateUpdateService.TestTasksStateUpdateService.testUpdateStateNomal (   self)

Definition at line 83 of file test_dtm_TasksStateUpdateService.py.

83  def testUpdateStateNomal(self):
84  req = Request("1")
85  req.add_data('''[{
86  "error_code": 0,
87  "error_message": "",
88  "id": "123",
89  "type": 0,
90  "host": "",
91  "port": 0,
92  "state": 0,
93  "pid": 1186,
94  "stdout": "",
95  "stderror": "",
96  "exit_status": 0,
97  "files": [],
98  "node":"node for test",
99  "time": 0
100  }]
101  ''')
102  self.hceNode.send(req)
103  time.sleep(0.3)
104  #TasksStateUpdateService send UpdateTaskFields message to TaskManager
105  pollResult = self.taskManagerConnServer.poll()
106  self.assertEqual(zmq.POLLIN, pollResult & zmq.POLLIN)
107  event = self.taskManagerConnServer.recv()
108  self.assertEqual(event.eventType, DTM_CONSTS.EVENT_TYPES.UPDATE_TASK_FIELDS)
109  self.assertTrue(isinstance(event.eventObj, UpdateTaskFields))
110  self.assertEqual(event.eventObj.id, "123")
111  self.assertEqual(event.eventObj.fields["state"], 0)
112  self.assertEqual(event.eventObj.fields["pId"], 1186)
113  self.assertEqual(event.eventObj.fields["nodeName"], "node for test")
114 

Member Data Documentation

◆ adminServer

tests.test_dtm_TasksStateUpdateService.TestTasksStateUpdateService.adminServer

Definition at line 61 of file test_dtm_TasksStateUpdateService.py.

◆ CONFIG_SECTION

string tests.test_dtm_TasksStateUpdateService.TestTasksStateUpdateService.CONFIG_SECTION = "TasksStateUpdateService"
static

Definition at line 37 of file test_dtm_TasksStateUpdateService.py.

◆ drceCommandConvertor

tests.test_dtm_TasksStateUpdateService.TestTasksStateUpdateService.drceCommandConvertor

Definition at line 70 of file test_dtm_TasksStateUpdateService.py.

◆ eeManagerServer

tests.test_dtm_TasksStateUpdateService.TestTasksStateUpdateService.eeManagerServer

Definition at line 63 of file test_dtm_TasksStateUpdateService.py.

◆ eventBuilder

tests.test_dtm_TasksStateUpdateService.TestTasksStateUpdateService.eventBuilder

Definition at line 43 of file test_dtm_TasksStateUpdateService.py.

◆ hceNode

tests.test_dtm_TasksStateUpdateService.TestTasksStateUpdateService.hceNode

Definition at line 69 of file test_dtm_TasksStateUpdateService.py.

◆ inited

tests.test_dtm_TasksStateUpdateService.TestTasksStateUpdateService.inited

Definition at line 47 of file test_dtm_TasksStateUpdateService.py.

◆ SERVICE_BIND_IP

string tests.test_dtm_TasksStateUpdateService.TestTasksStateUpdateService.SERVICE_BIND_IP = "127.0.0.1"
static

Definition at line 38 of file test_dtm_TasksStateUpdateService.py.

◆ SERVICE_BIND_PORT

string tests.test_dtm_TasksStateUpdateService.TestTasksStateUpdateService.SERVICE_BIND_PORT = "5500"
static

Definition at line 39 of file test_dtm_TasksStateUpdateService.py.

◆ taskManagerConnServer

tests.test_dtm_TasksStateUpdateService.TestTasksStateUpdateService.taskManagerConnServer

Definition at line 62 of file test_dtm_TasksStateUpdateService.py.

◆ tsus

tests.test_dtm_TasksStateUpdateService.TestTasksStateUpdateService.tsus

Definition at line 65 of file test_dtm_TasksStateUpdateService.py.


The documentation for this class was generated from the following file: