7 from mock
import MagicMock, call, ANY
24 self.
config = ConfigParser.RawConfigParser()
29 self.
config.add_section(CONFIG_SECTION)
30 self.
config.set(CONFIG_SECTION,
"serverHost", addr)
31 self.
config.set(CONFIG_SECTION,
"serverPort", port)
32 self.
config.set(CONFIG_SECTION,
"clientTaskManager",
"TaskManager")
33 self.
config.set(CONFIG_SECTION,
"clientExecutionEnvironmentManager",
"ExecutionEnvironmentManager")
51 connectBuilder_mock = MagicMock(spec = ConnectionBuilderLight)
54 expect_connect_calls = [call.build(tr_const.SERVER_CONNECT, self.
tcpAddr, tr_const.TCP_TYPE),
55 call.build(tr_const.CLIENT_CONNECT,
'TaskManager'),
56 call.build(tr_const.CLIENT_CONNECT,
'ExecutionEnvironmentManager')]
58 self.assertEqual(expect_connect_calls, connectBuilder_mock.mock_calls,
"config parsing is failed")
62 taskManager_mock = MagicMock()
64 transport_events = [EVENT_TYPES.NEW_TASK, EVENT_TYPES.UPDATE_TASK, EVENT_TYPES.GET_TASK_STATUS,
65 EVENT_TYPES.FETCH_RESULTS_CACHE, EVENT_TYPES.DELETE_TASK]
74 for eventType
in transport_events:
78 self.assertEqual(len(transport_events), taskManager_mock.call_count,
"")
83 taskManager_mock = MagicMock()
85 transport_events = [EVENT_TYPES.CHECK_TASK_STATE, EVENT_TYPES.FETCH_TASK_RESULTS]
94 for eventType
in transport_events:
98 self.assertEqual(len(transport_events), taskManager_mock.call_count,
"")
103 taskManager_mock = MagicMock()
105 transport_events = [EVENT_TYPES.NEW_TASK_RESPONSE, EVENT_TYPES.UPDATE_TASK_RESPONSE,
106 EVENT_TYPES.CHECK_TASK_STATE_RESPONSE, EVENT_TYPES.GET_TASK_STATUS_RESPONSE,
107 EVENT_TYPES.FETCH_TASK_RESULTS_RESPONSE, EVENT_TYPES.DELETE_TASK_RESPONSE]
116 for eventType
in transport_events:
120 self.assertEqual(len(transport_events), taskManager_mock.call_count,
"")
127 event = self.
eventBuilder.build(EVENT_TYPES.NEW_TASK,
"data obj")
128 event.connect_identity =
"1234" 129 event.connect_name =
"testName" 135 self.assertEqual(process_event.connect_identity,
"1234",
"")
136 self.assertEqual(process_event.connect_name,
"testName",
"")
137 self.assertEqual(process_event.eventObj,
None,
"")
144 event = self.
eventBuilder.build(EVENT_TYPES.NEW_TASK,
"data obj")
153 event = self.
eventBuilder.build(EVENT_TYPES.NEW_TASK,
"data obj")
def test_route_response_events(self)
def test_supress_keyerror_exception(self)
Class hides routines of bulding connection objects.
def test_route_event_to_eeManager(self)
def test_reading_config_params(self)
The gateway for dmt client communications.
def test_route_event_to_taskManager(self)
def test_drop_process_event(self)
def test_add_process_event(self)