◆ setUp()
def tests.test_drce_CommandExecutor.TestCommandExecutor.setUp |
( |
|
self | ) |
|
Definition at line 28 of file test_drce_CommandExecutor.py.
29 self.connect_mock = MagicMock(spec=Connection)
30 self.cmd_executor = CommandExecutor(self.connect_mock, CommandConvertor())
◆ teEst_execute_request()
def tests.test_drce_CommandExecutor.TestCommandExecutor.teEst_execute_request |
( |
|
self | ) |
|
Definition at line 33 of file test_drce_CommandExecutor.py.
33 def teEst_execute_request(self):
39 "error_message" :"msg", 44 {"files":[{"name":"f1", "data":"data1", "action":12}], 55 connect_mock_cfg = {
"send.return_value":
None,
56 "recv.return_value": response}
57 self.connect_mock.configure_mock(**connect_mock_cfg)
60 connect_expect_calls = [call.send(ANY), call.recv(ANY)]
62 task_request = TaskCheckRequest(
"task_id", consts.EXTEND_STATUS_INFO)
63 task_response = self.cmd_executor.
execute(task_request)
65 self.connect_mock.assert_has_calls(connect_expect_calls)
67 test_err_str =
"json parsing is failed" 68 self.assertEqual(task_response.error_code, 0, test_err_str)
69 self.assertEqual(task_response.error_msg,
"msg", test_err_str )
70 self.assertEqual(task_response.state, 1, test_err_str)
71 self.assertEqual(len(task_response.data), 1, test_err_str)
72 self.assertEqual(task_response.pid, 101, test_err_str)
def execute(self, commands, nodes)
execute method execute incoming commands on nodes, keepts reult in responses and responsesDicts field...
◆ test_catch_convert_exception_cmd_format()
def tests.test_drce_CommandExecutor.TestCommandExecutor.test_catch_convert_exception_cmd_format |
( |
|
self | ) |
|
Definition at line 96 of file test_drce_CommandExecutor.py.
96 def test_catch_convert_exception_cmd_format(self):
97 self.connect_mock.send.return_value =
None 98 self.connect_mock.recv.side_effect = CommandConvertorError(
"Boom")
100 task_request = TaskCheckRequest(
"task_id", consts.EXTEND_STATUS_INFO)
102 with self.assertRaises(CommandExecutorErr):
103 list(self.cmd_executor.
execute(task_request))
def execute(self, commands, nodes)
execute method execute incoming commands on nodes, keepts reult in responses and responsesDicts field...
◆ test_catch_convert_exception_msg_format()
def tests.test_drce_CommandExecutor.TestCommandExecutor.test_catch_convert_exception_msg_format |
( |
|
self | ) |
|
Definition at line 86 of file test_drce_CommandExecutor.py.
86 def test_catch_convert_exception_msg_format(self):
87 self.connect_mock.send.return_value =
None 88 self.connect_mock.recv.side_effect = ResponseFormatErr(
"Boom")
90 task_request = TaskCheckRequest(
"task_id", consts.EXTEND_STATUS_INFO)
92 with self.assertRaises(CommandExecutorErr):
93 list(self.cmd_executor.
execute(task_request))
def execute(self, commands, nodes)
execute method execute incoming commands on nodes, keepts reult in responses and responsesDicts field...
◆ test_let_out_timeout_exception()
def tests.test_drce_CommandExecutor.TestCommandExecutor.test_let_out_timeout_exception |
( |
|
self | ) |
|
Definition at line 76 of file test_drce_CommandExecutor.py.
76 def test_let_out_timeout_exception(self):
77 self.connect_mock.send.return_value =
None 78 self.connect_mock.recv.side_effect = ConnectionTimeout(
"Boom")
79 task_request = TaskCheckRequest(
"task_id", consts.EXTEND_STATUS_INFO)
81 with self.assertRaises(ConnectionTimeout):
82 list(self.cmd_executor.
execute(task_request))
def execute(self, commands, nodes)
execute method execute incoming commands on nodes, keepts reult in responses and responsesDicts field...
◆ cmd_executor
tests.test_drce_CommandExecutor.TestCommandExecutor.cmd_executor |
◆ connect_mock
tests.test_drce_CommandExecutor.TestCommandExecutor.connect_mock |
The documentation for this class was generated from the following file: