2 Created on Feb 26, 2014 7 from mock
import MagicMock, ANY
25 FORMAT =
'%(asctime)s - %(thread)ld - %(threadName)s - %(name)s - %(funcName)s - %(levelname)s - %(message)s' 26 logging.basicConfig(level=logging.DEBUG, format=FORMAT)
39 return first.uid == second.uid
and first.eventType == second.eventType
and\
40 first.eventObj == second.eventObj
44 print first.eventObj.fields
45 print second.eventObj.fields
46 return first.uid == second.uid
and first.eventType == second.eventType
and\
47 first.eventObj.fields == second.eventObj.fields
51 return first.eventType == second.eventType
and first.eventObj.__dict__ == second.eventObj.__dict__
85 handler_mock = MagicMock()
90 handler_mock.assert_called_once_with(self.
event)
94 unhandled_mock = MagicMock()
99 unhandled_mock.assert_called_once_with(self.
event)
112 reply_event = self.
event_builder.build(EVENT_TYPES.NEW_TASK,
"new obj")
113 expect_event = reply_event
114 expect_event.uid = self.
event.uid
116 match_event =
Matcher(matchEvents, expect_event)
119 self.
connect_mock.send.assert_called_once_with(match_event)
126 handler_mock = MagicMock()
132 handler_mock.assert_called_once_with(self.
event)
137 on_timeout_mock = MagicMock()
142 on_timeout_mock.assert_called_once_with()
151 with self.assertRaises(TransportInternalErr):
157 raw_server =
"raw_server" 158 response =
Response([
"sock_identity",
"id",
"body"])
159 connect_mock = MagicMock(Connection)
160 connect_mock.recv.return_value = response
164 event_processor_mock = MagicMock()
166 expect_event = self.
base_server_manager.eventBuilder.build(EVENT_TYPES.SERVER_TCP_RAW, response)
167 match_event =
Matcher(match_tcp_raw_events, expect_event)
173 event_processor_mock.assert_called_once_with(match_event)
177 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
180 connectLightBuilder_mock.build.assert_called_once_with(ANY, ANY)
184 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
185 adminConnect_mock = MagicMock(ConnectionLight)
187 admin_connection=adminConnect_mock)
188 self.assertEqual(connectLightBuilder_mock.build.call_count, 0,
"was called")
192 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
196 connect_mock = MagicMock(ConnectionLight)
197 connect_mock.recv.return_value = adminEvent
205 connect_mock.send.assert_called_once_with(ANY)
209 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
212 self.
adminState.command = AdminState.STATE_NOP
214 connect_mock = MagicMock(ConnectionLight)
215 connect_mock.recv.return_value = adminEvent
223 connect_mock.send.assert_called_once_with(ANY)
227 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
230 self.
adminState.className =
"AdminState.STATE_NOP" 232 connect_mock = MagicMock(ConnectionLight)
233 connect_mock.recv.return_value = adminEvent
241 connect_mock.send.assert_called_once_with(ANY)
246 ready =
AdminState(className, AdminState.STATE_READY)
247 readyEvent = self.
base_server_manager.eventBuilder.build(EVENT_TYPES.ADMIN_STATE_RESPONSE, ready)
249 admin_connect_name = BaseServerManager.ADMIN_CONNECT_CLIENT
250 self.assertTrue(self.
base_server_manager.statFields[admin_connect_name +
"_send_cnt"] == 1,
"")
251 self.assertTrue(self.
base_server_manager.statFields[admin_connect_name +
"_recv_cnt"] == 0,
"")
252 self.assertTrue(self.
base_server_manager.statFields[admin_connect_name +
"_send_bytes"] == sys.getsizeof(readyEvent),
"")
253 self.assertTrue(self.
base_server_manager.statFields[admin_connect_name +
"_recv_bytes"] == 0,
"")
257 fake_user_connect =
None 258 connect_name =
"user_connect" 263 self.assertTrue(self.
base_server_manager.statFields[connect_name +
"_send_bytes"] == 0,
"")
264 self.assertTrue(self.
base_server_manager.statFields[connect_name +
"_recv_bytes"] == 0,
"")
268 msg_size = sys.getsizeof(self.
event)
269 fake_user_connect =
None 270 connect_name =
"user_connect" 271 self.
event.connect_name = connect_name
279 self.assertTrue(self.
base_server_manager.statFields[connect_name +
"_send_bytes"] == 0,
"")
280 self.assertTrue(self.
base_server_manager.statFields[connect_name +
"_recv_bytes"] == 2 * msg_size,
"")
284 self.
event.connect_name =
"bad_stat_name" 290 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
294 adminStatEvent = self.
event_builder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA, adminStatData)
295 connect_mock = MagicMock(ConnectionLight)
296 connect_mock.recv.return_value = adminStatEvent
302 expect_event = self.
event_builder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA_RESPONSE, expectAdminStatData)
303 expect_event.uid = adminStatEvent.uid
304 match_event =
Matcher(matchStatEvents, expect_event)
308 connect_mock.send.assert_called_once_with(match_event)
312 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
315 adminStatData =
AdminStatData(
"BaseServerManager", dict({
"Admin_recv_cnt":0,
"WrongName":0}))
316 adminStatEvent = self.
event_builder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA, adminStatData)
317 connect_mock = MagicMock(ConnectionLight)
318 connect_mock.recv.return_value = adminStatEvent
324 expectAdminStatData =
AdminStatData(
"BaseServerManager", dict({
"Admin_recv_cnt":1,
"WrongName":0}))
325 expect_event = self.
event_builder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA_RESPONSE, expectAdminStatData)
326 expect_event.uid = adminStatEvent.uid
327 match_event =
Matcher(matchStatEvents, expect_event)
331 connect_mock.send.assert_called_once_with(match_event)
def test__admin_fetch_stat_data_all(self)
def test_correct_add_new_connection(self)
def matchStatEvents(first, second)
def test_get_client_admin_event(self)
It's a wrapper similar to zmsg.hpp in sense of encapsulation of hce response message structure...
This is app base class for management server connection end-points and parallel transport messages pr...
def test_propagate_out_TransportInternalErr(self)
def test_call_unhandled_event_handler(self)
def test_admin_event_broken_class_name(self)
AdminState event object, for admin manage change application state commands, like shutdown...
def test_set__event_handler(self)
def test_stats_update_invalid_stat_key(self)
Class hides routines of bulding connection objects.
def test_call_on_timeout_callback(self)
def test_stats_recv_several_msg(self)
def test_admin_event_broken_command_type(self)
def test_init_user_connection(self)
AdminStatData event object, for admin fetch stat fields and possible data from any threaded classes i...
def test_init_stat_fields_admin_connect(self)
def __init__(self, compare, some_obj)
def matchEvents(first, second)
def test_create_client_admin_connect(self)
def test_create_server_connect(self)
def test_process_response_tcp_raw(self)
def test_run_one_loop(self)
def match_tcp_raw_events(first, second)
def test__admin_fetch_stat_data_some_fields(self)
def test_send_event(self)