HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
test_app_BaseServerManager.py
Go to the documentation of this file.
1 '''
2 Created on Feb 26, 2014
3 
4 @author: igor
5 '''
6 import unittest
7 from mock import MagicMock, ANY
8 import sys
9 import zmq
10 
11 from app.BaseServerManager import BaseServerManager
12 from app.PollerManager import PollerManager
13 from transport.Event import EventBuilder
14 from transport.ConnectionBuilderLight import ConnectionBuilderLight
15 from transport.ConnectionLight import ConnectionLight
16 from transport.Connection import ConnectionTimeout
17 from transport.Connection import TransportInternalErr
18 from transport.Response import Response
19 from transport.Connection import Connection
20 import transport.Consts as transport_const
21 from dtm.Constants import EVENT_TYPES
22 from dtm.EventObjects import AdminState, AdminStatData
23 
24 import logging
25 FORMAT = '%(asctime)s - %(thread)ld - %(threadName)s - %(name)s - %(funcName)s - %(levelname)s - %(message)s'
26 logging.basicConfig(level=logging.DEBUG, format=FORMAT) #basic logging configuration
27 
28 class Matcher(object):
29 
30  def __init__(self, compare, some_obj):
31  self.compare = compare
32  self.some_obj = some_obj
33 
34  def __eq__(self, other):
35  return self.compare(self.some_obj, other)
36 
37 
38 def matchEvents(first, second):
39  return first.uid == second.uid and first.eventType == second.eventType and\
40  first.eventObj == second.eventObj
41 
42 
43 def matchStatEvents(first, second):
44  print first.eventObj.fields
45  print second.eventObj.fields
46  return first.uid == second.uid and first.eventType == second.eventType and\
47  first.eventObj.fields == second.eventObj.fields
48 
49 
50 def match_tcp_raw_events(first, second):
51  return first.eventType == second.eventType and first.eventObj.__dict__ == second.eventObj.__dict__
52 
53 
54 class TestBaseServerManager(unittest.TestCase):
55 
56 
57  def setUp(self):
59  self.adminServerFake = self.connectionBuilderLight.build(transport_const.SERVER_CONNECT, BaseServerManager.ADMIN_CONNECT_ENDPOINT)
60 
61  self.sock_mock = MagicMock(spec = zmq.Socket)
62  self.poller_mock = MagicMock(spec = zmq.Poller)
63  self.poller_manager_mock = MagicMock(spec = PollerManager)
64  self.connect_mock = MagicMock(spec = ConnectionLight)
67  self.event = self.event_builder.build(EVENT_TYPES.NEW_TASK, "eventObj")
68  self.connect_name = "Admin"
69  self.event.connect_name = self.connect_name
70  self.adminState = AdminState("BaseServerManager", AdminState.STATE_SHUTDOWN)
71 
72 
73  def tearDown(self):
74  self.adminServerFake.close()
75 
76 
78  connect = ConnectionLight(self.sock_mock, transport_const.CLIENT_CONNECT)
79  self.base_server_manager.addConnection(self.connect_name, connect)
80 
81  self.assertEqual(self.base_server_manager.connections[self.connect_name], connect, "")
82 
83 
85  handler_mock = MagicMock()
86  self.base_server_manager.setEventHandler(EVENT_TYPES.NEW_TASK, handler_mock)
87 
89 
90  handler_mock.assert_called_once_with(self.event)
91 
92 
94  unhandled_mock = MagicMock()
95  self.base_server_manager.on_unhandled_event = unhandled_mock
96 
98 
99  unhandled_mock.assert_called_once_with(self.event)
100 
101 
102  def test_send_event(self):
103  self.base_server_manager.addConnection(self.connect_name, self.connect_mock)
104  self.base_server_manager.send(self.connect_name,self. event)
105 
106  self.connect_mock.send.assert_called_once_with(self.event)
107 
108 
109  def test_reply(self):
110  self.base_server_manager.addConnection(self.connect_name, self.connect_mock)
111 
112  reply_event = self.event_builder.build(EVENT_TYPES.NEW_TASK, "new obj")
113  expect_event = reply_event
114  expect_event.uid = self.event.uid
115 
116  match_event = Matcher(matchEvents, expect_event)
117  self.base_server_manager.reply(self.event, reply_event)
118 
119  self.connect_mock.send.assert_called_once_with(match_event)
120 
121 
122  def test_run_one_loop(self):
123  self.poller_manager_mock.poll.return_value = [self.connect_name]
124  self.connect_mock.recv.return_value = self.event
125 
126  handler_mock = MagicMock()
127  self.base_server_manager.setEventHandler(EVENT_TYPES.NEW_TASK, handler_mock)
128  self.base_server_manager.addConnection(self.connect_name, self.connect_mock)
129 
130  self.base_server_manager.poll()
131 
132  handler_mock.assert_called_once_with(self.event)
133 
134 
136  self.poller_manager_mock.poll.side_effect = ConnectionTimeout("boom")
137  on_timeout_mock = MagicMock()
138  self.base_server_manager.on_poll_timeout = on_timeout_mock
139 
140  self.base_server_manager.poll()
141 
142  on_timeout_mock.assert_called_once_with()
143 
144 
146  self.poller_manager_mock.poll.return_value = [self.connect_name]
147  self.connect_mock.recv.side_effect = TransportInternalErr("boom")
148 
149  self.base_server_manager.addConnection(self.connect_name, self.connect_mock)
150 
151  with self.assertRaises(TransportInternalErr):
152  list(self.base_server_manager.poll())
153 
154 
155 
157  raw_server = "raw_server"
158  response = Response(["sock_identity", "id", "body"])
159  connect_mock = MagicMock(Connection)
160  connect_mock.recv.return_value = response
161  self.poller_manager_mock.poll.return_value = [raw_server]
162 
163  self.base_server_manager.addConnection(raw_server, connect_mock)
164  event_processor_mock = MagicMock()
165 
166  expect_event = self.base_server_manager.eventBuilder.build(EVENT_TYPES.SERVER_TCP_RAW, response)
167  match_event = Matcher(match_tcp_raw_events, expect_event)
168 
169  self.base_server_manager.setEventHandler(EVENT_TYPES.SERVER_TCP_RAW, event_processor_mock)
170 
171  self.base_server_manager.poll()
172 
173  event_processor_mock.assert_called_once_with(match_event)
174 
175 
177  connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
178  self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock)
179 
180  connectLightBuilder_mock.build.assert_called_once_with(ANY, ANY)
181 
182 
184  connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
185  adminConnect_mock = MagicMock(ConnectionLight)
186  self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock,
187  admin_connection=adminConnect_mock)
188  self.assertEqual(connectLightBuilder_mock.build.call_count, 0, "was called")
189 
190 
192  connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
193  self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock)
194 
195  adminEvent = self.event_builder.build(EVENT_TYPES.ADMIN_STATE, self.adminState)
196  connect_mock = MagicMock(ConnectionLight)
197  connect_mock.recv.return_value = adminEvent
198 
199  self.poller_manager_mock.poll.return_value = [self.base_server_manager.ADMIN_CONNECT_CLIENT]
200  self.base_server_manager.addConnection(self.base_server_manager.ADMIN_CONNECT_CLIENT, connect_mock)
201 
202  self.base_server_manager.poll()
203 
204  self.assertEqual(self.base_server_manager.exit_flag, True, "still running")
205  connect_mock.send.assert_called_once_with(ANY)
206 
207 
209  connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
210  self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock)
211 
212  self.adminState.command = AdminState.STATE_NOP
213  adminEvent = self.event_builder.build(EVENT_TYPES.ADMIN_STATE, self.adminState)
214  connect_mock = MagicMock(ConnectionLight)
215  connect_mock.recv.return_value = adminEvent
216 
217  self.poller_manager_mock.poll.return_value = [self.base_server_manager.ADMIN_CONNECT_CLIENT]
218  self.base_server_manager.addConnection(self.base_server_manager.ADMIN_CONNECT_CLIENT, connect_mock)
219 
220  self.base_server_manager.poll()
221 
222  self.assertEqual(self.base_server_manager.exit_flag, False, "stopped")
223  connect_mock.send.assert_called_once_with(ANY)
224 
225 
227  connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
228  self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock)
229 
230  self.adminState.className = "AdminState.STATE_NOP"
231  adminEvent = self.event_builder.build(EVENT_TYPES.ADMIN_STATE, self.adminState)
232  connect_mock = MagicMock(ConnectionLight)
233  connect_mock.recv.return_value = adminEvent
234 
235  self.poller_manager_mock.poll.return_value = [self.base_server_manager.ADMIN_CONNECT_CLIENT]
236  self.base_server_manager.addConnection(self.base_server_manager.ADMIN_CONNECT_CLIENT, connect_mock)
237 
238  self.base_server_manager.poll()
239 
240  self.assertEqual(self.base_server_manager.exit_flag, False, "stopped")
241  connect_mock.send.assert_called_once_with(ANY)
242 
243 
245  className = self.base_server_manager.__class__.__name__
246  ready = AdminState(className, AdminState.STATE_READY)
247  readyEvent = self.base_server_manager.eventBuilder.build(EVENT_TYPES.ADMIN_STATE_RESPONSE, ready)
248 
249  admin_connect_name = BaseServerManager.ADMIN_CONNECT_CLIENT
250  self.assertTrue(self.base_server_manager.statFields[admin_connect_name + "_send_cnt"] == 1, "")
251  self.assertTrue(self.base_server_manager.statFields[admin_connect_name + "_recv_cnt"] == 0, "")
252  self.assertTrue(self.base_server_manager.statFields[admin_connect_name + "_send_bytes"] == sys.getsizeof(readyEvent), "")
253  self.assertTrue(self.base_server_manager.statFields[admin_connect_name + "_recv_bytes"] == 0, "")
254 
255 
257  fake_user_connect = None
258  connect_name = "user_connect"
259  self.base_server_manager.addConnection(connect_name, fake_user_connect)
260 
261  self.assertTrue(self.base_server_manager.statFields[connect_name + "_send_cnt"] == 0, "")
262  self.assertTrue(self.base_server_manager.statFields[connect_name + "_recv_cnt"] == 0, "")
263  self.assertTrue(self.base_server_manager.statFields[connect_name + "_send_bytes"] == 0, "")
264  self.assertTrue(self.base_server_manager.statFields[connect_name + "_recv_bytes"] == 0, "")
265 
266 
268  msg_size = sys.getsizeof(self.event)
269  fake_user_connect = None
270  connect_name = "user_connect"
271  self.event.connect_name = connect_name
272  self.base_server_manager.addConnection(connect_name, fake_user_connect)
273 
276 
277  self.assertTrue(self.base_server_manager.statFields[connect_name + "_send_cnt"] == 0, "")
278  self.assertTrue(self.base_server_manager.statFields[connect_name + "_recv_cnt"] == 2, "")
279  self.assertTrue(self.base_server_manager.statFields[connect_name + "_send_bytes"] == 0, "")
280  self.assertTrue(self.base_server_manager.statFields[connect_name + "_recv_bytes"] == 2 * msg_size, "")
281 
282 
284  self.event.connect_name = "bad_stat_name"
285 
287 
288 
290  connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
291  self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock)
292 
293  adminStatData = AdminStatData("BaseServerManager")
294  adminStatEvent = self.event_builder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA, adminStatData)
295  connect_mock = MagicMock(ConnectionLight)
296  connect_mock.recv.return_value = adminStatEvent
297 
298  self.poller_manager_mock.poll.return_value = [self.base_server_manager.ADMIN_CONNECT_CLIENT]
299  self.base_server_manager.addConnection(self.base_server_manager.ADMIN_CONNECT_CLIENT, connect_mock)
300 
301  expectAdminStatData = AdminStatData("BaseServerManager", self.base_server_manager.statFields)
302  expect_event = self.event_builder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA_RESPONSE, expectAdminStatData)
303  expect_event.uid = adminStatEvent.uid
304  match_event = Matcher(matchStatEvents, expect_event)
305 
306  self.base_server_manager.poll()
307 
308  connect_mock.send.assert_called_once_with(match_event)
309 
310 
312  connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
313  self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock)
314 
315  adminStatData = AdminStatData("BaseServerManager", dict({"Admin_recv_cnt":0, "WrongName":0}))
316  adminStatEvent = self.event_builder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA, adminStatData)
317  connect_mock = MagicMock(ConnectionLight)
318  connect_mock.recv.return_value = adminStatEvent
319 
320  self.poller_manager_mock.poll.return_value = [self.base_server_manager.ADMIN_CONNECT_CLIENT]
321 
322  self.base_server_manager.addConnection(self.base_server_manager.ADMIN_CONNECT_CLIENT, connect_mock)
323 
324  expectAdminStatData = AdminStatData("BaseServerManager", dict({"Admin_recv_cnt":1, "WrongName":0}))
325  expect_event = self.event_builder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA_RESPONSE, expectAdminStatData)
326  expect_event.uid = adminStatEvent.uid
327  match_event = Matcher(matchStatEvents, expect_event)
328 
329  self.base_server_manager.poll()
330 
331  connect_mock.send.assert_called_once_with(match_event)
332 
It's a wrapper similar to zmsg.hpp in sense of encapsulation of hce response message structure...
Definition: Response.py:20
This is app base class for management server connection end-points and parallel transport messages pr...
AdminState event object, for admin manage change application state commands, like shutdown...
Class hides routines of bulding connection objects.
AdminStatData event object, for admin fetch stat fields and possible data from any threaded classes i...