◆ setUp()
def tests.test_app_BaseServerManager.TestBaseServerManager.setUp |
( |
|
self | ) |
|
Definition at line 57 of file test_app_BaseServerManager.py.
58 self.connectionBuilderLight = ConnectionBuilderLight()
59 self.adminServerFake = self.connectionBuilderLight.build(transport_const.SERVER_CONNECT, BaseServerManager.ADMIN_CONNECT_ENDPOINT)
61 self.sock_mock = MagicMock(spec = zmq.Socket)
62 self.poller_mock = MagicMock(spec = zmq.Poller)
63 self.poller_manager_mock = MagicMock(spec = PollerManager)
64 self.connect_mock = MagicMock(spec = ConnectionLight)
65 self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=self.connectionBuilderLight)
66 self.event_builder = EventBuilder()
67 self.event = self.event_builder.build(EVENT_TYPES.NEW_TASK,
"eventObj")
68 self.connect_name =
"Admin" 69 self.event.connect_name = self.connect_name
70 self.adminState = AdminState(
"BaseServerManager", AdminState.STATE_SHUTDOWN)
◆ tearDown()
def tests.test_app_BaseServerManager.TestBaseServerManager.tearDown |
( |
|
self | ) |
|
◆ test__admin_fetch_stat_data_all()
def tests.test_app_BaseServerManager.TestBaseServerManager.test__admin_fetch_stat_data_all |
( |
|
self | ) |
|
Definition at line 289 of file test_app_BaseServerManager.py.
289 def test__admin_fetch_stat_data_all(self):
290 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
291 self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock)
293 adminStatData = AdminStatData(
"BaseServerManager")
294 adminStatEvent = self.event_builder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA, adminStatData)
295 connect_mock = MagicMock(ConnectionLight)
296 connect_mock.recv.return_value = adminStatEvent
298 self.poller_manager_mock.poll.return_value = [self.base_server_manager.ADMIN_CONNECT_CLIENT]
299 self.base_server_manager.addConnection(self.base_server_manager.ADMIN_CONNECT_CLIENT, connect_mock)
301 expectAdminStatData = AdminStatData(
"BaseServerManager", self.base_server_manager.statFields)
302 expect_event = self.event_builder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA_RESPONSE, expectAdminStatData)
303 expect_event.uid = adminStatEvent.uid
304 match_event = Matcher(matchStatEvents, expect_event)
306 self.base_server_manager.poll()
308 connect_mock.send.assert_called_once_with(match_event)
◆ test__admin_fetch_stat_data_some_fields()
def tests.test_app_BaseServerManager.TestBaseServerManager.test__admin_fetch_stat_data_some_fields |
( |
|
self | ) |
|
Definition at line 311 of file test_app_BaseServerManager.py.
311 def test__admin_fetch_stat_data_some_fields(self):
312 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
313 self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock)
315 adminStatData = AdminStatData(
"BaseServerManager", dict({
"Admin_recv_cnt":0,
"WrongName":0}))
316 adminStatEvent = self.event_builder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA, adminStatData)
317 connect_mock = MagicMock(ConnectionLight)
318 connect_mock.recv.return_value = adminStatEvent
320 self.poller_manager_mock.poll.return_value = [self.base_server_manager.ADMIN_CONNECT_CLIENT]
322 self.base_server_manager.addConnection(self.base_server_manager.ADMIN_CONNECT_CLIENT, connect_mock)
324 expectAdminStatData = AdminStatData(
"BaseServerManager", dict({
"Admin_recv_cnt":1,
"WrongName":0}))
325 expect_event = self.event_builder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA_RESPONSE, expectAdminStatData)
326 expect_event.uid = adminStatEvent.uid
327 match_event = Matcher(matchStatEvents, expect_event)
329 self.base_server_manager.poll()
331 connect_mock.send.assert_called_once_with(match_event)
◆ test_admin_event_broken_class_name()
def tests.test_app_BaseServerManager.TestBaseServerManager.test_admin_event_broken_class_name |
( |
|
self | ) |
|
Definition at line 226 of file test_app_BaseServerManager.py.
226 def test_admin_event_broken_class_name(self):
227 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
228 self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock)
230 self.adminState.className =
"AdminState.STATE_NOP" 231 adminEvent = self.event_builder.build(EVENT_TYPES.ADMIN_STATE, self.adminState)
232 connect_mock = MagicMock(ConnectionLight)
233 connect_mock.recv.return_value = adminEvent
235 self.poller_manager_mock.poll.return_value = [self.base_server_manager.ADMIN_CONNECT_CLIENT]
236 self.base_server_manager.addConnection(self.base_server_manager.ADMIN_CONNECT_CLIENT, connect_mock)
238 self.base_server_manager.poll()
240 self.assertEqual(self.base_server_manager.exit_flag,
False,
"stopped")
241 connect_mock.send.assert_called_once_with(ANY)
◆ test_admin_event_broken_command_type()
def tests.test_app_BaseServerManager.TestBaseServerManager.test_admin_event_broken_command_type |
( |
|
self | ) |
|
Definition at line 208 of file test_app_BaseServerManager.py.
208 def test_admin_event_broken_command_type(self):
209 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
210 self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock)
212 self.adminState.command = AdminState.STATE_NOP
213 adminEvent = self.event_builder.build(EVENT_TYPES.ADMIN_STATE, self.adminState)
214 connect_mock = MagicMock(ConnectionLight)
215 connect_mock.recv.return_value = adminEvent
217 self.poller_manager_mock.poll.return_value = [self.base_server_manager.ADMIN_CONNECT_CLIENT]
218 self.base_server_manager.addConnection(self.base_server_manager.ADMIN_CONNECT_CLIENT, connect_mock)
220 self.base_server_manager.poll()
222 self.assertEqual(self.base_server_manager.exit_flag,
False,
"stopped")
223 connect_mock.send.assert_called_once_with(ANY)
◆ test_call_on_timeout_callback()
def tests.test_app_BaseServerManager.TestBaseServerManager.test_call_on_timeout_callback |
( |
|
self | ) |
|
Definition at line 135 of file test_app_BaseServerManager.py.
135 def test_call_on_timeout_callback(self):
136 self.poller_manager_mock.poll.side_effect = ConnectionTimeout(
"boom")
137 on_timeout_mock = MagicMock()
138 self.base_server_manager.on_poll_timeout = on_timeout_mock
140 self.base_server_manager.poll()
142 on_timeout_mock.assert_called_once_with()
◆ test_call_unhandled_event_handler()
def tests.test_app_BaseServerManager.TestBaseServerManager.test_call_unhandled_event_handler |
( |
|
self | ) |
|
Definition at line 93 of file test_app_BaseServerManager.py.
93 def test_call_unhandled_event_handler(self):
94 unhandled_mock = MagicMock()
95 self.base_server_manager.on_unhandled_event = unhandled_mock
97 self.base_server_manager.
process(self.event)
99 unhandled_mock.assert_called_once_with(self.event)
◆ test_correct_add_new_connection()
def tests.test_app_BaseServerManager.TestBaseServerManager.test_correct_add_new_connection |
( |
|
self | ) |
|
Definition at line 77 of file test_app_BaseServerManager.py.
77 def test_correct_add_new_connection(self):
78 connect = ConnectionLight(self.sock_mock, transport_const.CLIENT_CONNECT)
79 self.base_server_manager.addConnection(self.connect_name, connect)
81 self.assertEqual(self.base_server_manager.connections[self.connect_name], connect,
"")
◆ test_create_client_admin_connect()
def tests.test_app_BaseServerManager.TestBaseServerManager.test_create_client_admin_connect |
( |
|
self | ) |
|
Definition at line 176 of file test_app_BaseServerManager.py.
176 def test_create_client_admin_connect(self):
177 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
178 self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock)
180 connectLightBuilder_mock.build.assert_called_once_with(ANY, ANY)
◆ test_create_server_connect()
def tests.test_app_BaseServerManager.TestBaseServerManager.test_create_server_connect |
( |
|
self | ) |
|
Definition at line 183 of file test_app_BaseServerManager.py.
183 def test_create_server_connect(self):
184 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
185 adminConnect_mock = MagicMock(ConnectionLight)
186 self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock,
187 admin_connection=adminConnect_mock)
188 self.assertEqual(connectLightBuilder_mock.build.call_count, 0,
"was called")
◆ test_get_client_admin_event()
def tests.test_app_BaseServerManager.TestBaseServerManager.test_get_client_admin_event |
( |
|
self | ) |
|
Definition at line 191 of file test_app_BaseServerManager.py.
191 def test_get_client_admin_event(self):
192 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)
193 self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock)
195 adminEvent = self.event_builder.build(EVENT_TYPES.ADMIN_STATE, self.adminState)
196 connect_mock = MagicMock(ConnectionLight)
197 connect_mock.recv.return_value = adminEvent
199 self.poller_manager_mock.poll.return_value = [self.base_server_manager.ADMIN_CONNECT_CLIENT]
200 self.base_server_manager.addConnection(self.base_server_manager.ADMIN_CONNECT_CLIENT, connect_mock)
202 self.base_server_manager.poll()
204 self.assertEqual(self.base_server_manager.exit_flag,
True,
"still running")
205 connect_mock.send.assert_called_once_with(ANY)
◆ test_init_stat_fields_admin_connect()
def tests.test_app_BaseServerManager.TestBaseServerManager.test_init_stat_fields_admin_connect |
( |
|
self | ) |
|
Definition at line 244 of file test_app_BaseServerManager.py.
244 def test_init_stat_fields_admin_connect(self):
245 className = self.base_server_manager.__class__.__name__
246 ready = AdminState(className, AdminState.STATE_READY)
247 readyEvent = self.base_server_manager.eventBuilder.build(EVENT_TYPES.ADMIN_STATE_RESPONSE, ready)
249 admin_connect_name = BaseServerManager.ADMIN_CONNECT_CLIENT
250 self.assertTrue(self.base_server_manager.statFields[admin_connect_name +
"_send_cnt"] == 1,
"")
251 self.assertTrue(self.base_server_manager.statFields[admin_connect_name +
"_recv_cnt"] == 0,
"")
252 self.assertTrue(self.base_server_manager.statFields[admin_connect_name +
"_send_bytes"] == sys.getsizeof(readyEvent),
"")
253 self.assertTrue(self.base_server_manager.statFields[admin_connect_name +
"_recv_bytes"] == 0,
"")
◆ test_init_user_connection()
def tests.test_app_BaseServerManager.TestBaseServerManager.test_init_user_connection |
( |
|
self | ) |
|
Definition at line 256 of file test_app_BaseServerManager.py.
256 def test_init_user_connection(self):
257 fake_user_connect =
None 258 connect_name =
"user_connect" 259 self.base_server_manager.addConnection(connect_name, fake_user_connect)
261 self.assertTrue(self.base_server_manager.statFields[connect_name +
"_send_cnt"] == 0,
"")
262 self.assertTrue(self.base_server_manager.statFields[connect_name +
"_recv_cnt"] == 0,
"")
263 self.assertTrue(self.base_server_manager.statFields[connect_name +
"_send_bytes"] == 0,
"")
264 self.assertTrue(self.base_server_manager.statFields[connect_name +
"_recv_bytes"] == 0,
"")
◆ test_process_response_tcp_raw()
def tests.test_app_BaseServerManager.TestBaseServerManager.test_process_response_tcp_raw |
( |
|
self | ) |
|
Definition at line 156 of file test_app_BaseServerManager.py.
156 def test_process_response_tcp_raw(self):
157 raw_server =
"raw_server" 158 response = Response([
"sock_identity",
"id",
"body"])
159 connect_mock = MagicMock(Connection)
160 connect_mock.recv.return_value = response
161 self.poller_manager_mock.poll.return_value = [raw_server]
163 self.base_server_manager.addConnection(raw_server, connect_mock)
164 event_processor_mock = MagicMock()
166 expect_event = self.base_server_manager.eventBuilder.build(EVENT_TYPES.SERVER_TCP_RAW, response)
167 match_event = Matcher(match_tcp_raw_events, expect_event)
169 self.base_server_manager.setEventHandler(EVENT_TYPES.SERVER_TCP_RAW, event_processor_mock)
171 self.base_server_manager.poll()
173 event_processor_mock.assert_called_once_with(match_event)
◆ test_propagate_out_TransportInternalErr()
def tests.test_app_BaseServerManager.TestBaseServerManager.test_propagate_out_TransportInternalErr |
( |
|
self | ) |
|
Definition at line 145 of file test_app_BaseServerManager.py.
145 def test_propagate_out_TransportInternalErr(self):
146 self.poller_manager_mock.poll.return_value = [self.connect_name]
147 self.connect_mock.recv.side_effect = TransportInternalErr(
"boom")
149 self.base_server_manager.addConnection(self.connect_name, self.connect_mock)
151 with self.assertRaises(TransportInternalErr):
152 list(self.base_server_manager.poll())
◆ test_reply()
def tests.test_app_BaseServerManager.TestBaseServerManager.test_reply |
( |
|
self | ) |
|
Definition at line 109 of file test_app_BaseServerManager.py.
109 def test_reply(self):
110 self.base_server_manager.addConnection(self.connect_name, self.connect_mock)
112 reply_event = self.event_builder.build(EVENT_TYPES.NEW_TASK,
"new obj")
113 expect_event = reply_event
114 expect_event.uid = self.event.uid
116 match_event = Matcher(matchEvents, expect_event)
117 self.base_server_manager.reply(self.event, reply_event)
119 self.connect_mock.send.assert_called_once_with(match_event)
◆ test_run_one_loop()
def tests.test_app_BaseServerManager.TestBaseServerManager.test_run_one_loop |
( |
|
self | ) |
|
Definition at line 122 of file test_app_BaseServerManager.py.
122 def test_run_one_loop(self):
123 self.poller_manager_mock.poll.return_value = [self.connect_name]
124 self.connect_mock.recv.return_value = self.event
126 handler_mock = MagicMock()
127 self.base_server_manager.setEventHandler(EVENT_TYPES.NEW_TASK, handler_mock)
128 self.base_server_manager.addConnection(self.connect_name, self.connect_mock)
130 self.base_server_manager.poll()
132 handler_mock.assert_called_once_with(self.event)
◆ test_send_event()
def tests.test_app_BaseServerManager.TestBaseServerManager.test_send_event |
( |
|
self | ) |
|
Definition at line 102 of file test_app_BaseServerManager.py.
102 def test_send_event(self):
103 self.base_server_manager.addConnection(self.connect_name, self.connect_mock)
104 self.base_server_manager.send(self.connect_name,self. event)
106 self.connect_mock.send.assert_called_once_with(self.event)
◆ test_set__event_handler()
def tests.test_app_BaseServerManager.TestBaseServerManager.test_set__event_handler |
( |
|
self | ) |
|
Definition at line 84 of file test_app_BaseServerManager.py.
84 def test_set__event_handler(self):
85 handler_mock = MagicMock()
86 self.base_server_manager.setEventHandler(EVENT_TYPES.NEW_TASK, handler_mock)
88 self.base_server_manager.
process(self.event)
90 handler_mock.assert_called_once_with(self.event)
◆ test_stats_recv_several_msg()
def tests.test_app_BaseServerManager.TestBaseServerManager.test_stats_recv_several_msg |
( |
|
self | ) |
|
Definition at line 267 of file test_app_BaseServerManager.py.
267 def test_stats_recv_several_msg(self):
268 msg_size = sys.getsizeof(self.event)
269 fake_user_connect =
None 270 connect_name =
"user_connect" 271 self.event.connect_name = connect_name
272 self.base_server_manager.addConnection(connect_name, fake_user_connect)
274 self.base_server_manager.
process(self.event)
275 self.base_server_manager.
process(self.event)
277 self.assertTrue(self.base_server_manager.statFields[connect_name +
"_send_cnt"] == 0,
"")
278 self.assertTrue(self.base_server_manager.statFields[connect_name +
"_recv_cnt"] == 2,
"")
279 self.assertTrue(self.base_server_manager.statFields[connect_name +
"_send_bytes"] == 0,
"")
280 self.assertTrue(self.base_server_manager.statFields[connect_name +
"_recv_bytes"] == 2 * msg_size,
"")
◆ test_stats_update_invalid_stat_key()
def tests.test_app_BaseServerManager.TestBaseServerManager.test_stats_update_invalid_stat_key |
( |
|
self | ) |
|
Definition at line 283 of file test_app_BaseServerManager.py.
283 def test_stats_update_invalid_stat_key(self):
284 self.event.connect_name =
"bad_stat_name" 286 self.base_server_manager.
process(self.event)
◆ adminServerFake
tests.test_app_BaseServerManager.TestBaseServerManager.adminServerFake |
◆ adminState
tests.test_app_BaseServerManager.TestBaseServerManager.adminState |
◆ base_server_manager
tests.test_app_BaseServerManager.TestBaseServerManager.base_server_manager |
◆ connect_mock
tests.test_app_BaseServerManager.TestBaseServerManager.connect_mock |
◆ connect_name
tests.test_app_BaseServerManager.TestBaseServerManager.connect_name |
◆ connectionBuilderLight
tests.test_app_BaseServerManager.TestBaseServerManager.connectionBuilderLight |
◆ event
tests.test_app_BaseServerManager.TestBaseServerManager.event |
◆ event_builder
tests.test_app_BaseServerManager.TestBaseServerManager.event_builder |
◆ poller_manager_mock
tests.test_app_BaseServerManager.TestBaseServerManager.poller_manager_mock |
◆ poller_mock
tests.test_app_BaseServerManager.TestBaseServerManager.poller_mock |
◆ sock_mock
tests.test_app_BaseServerManager.TestBaseServerManager.sock_mock |
The documentation for this class was generated from the following file: