HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
tests.test_app_PollerManager.TestPollerManager Class Reference
Inheritance diagram for tests.test_app_PollerManager.TestPollerManager:
Collaboration diagram for tests.test_app_PollerManager.TestPollerManager:

Public Member Functions

def setUp (self)
 
def test_add_conections (self)
 
def test_remove_conenction (self)
 
def test_timeout (self)
 
def test_return_names (self)
 

Public Attributes

 sock_mock
 
 poller_mock
 
 poller_manager
 

Detailed Description

Definition at line 14 of file test_app_PollerManager.py.

Member Function Documentation

◆ setUp()

def tests.test_app_PollerManager.TestPollerManager.setUp (   self)

Definition at line 17 of file test_app_PollerManager.py.

17  def setUp(self):
18  self.sock_mock = MagicMock(spec = zmq.Socket)
19  self.poller_mock = MagicMock(spec = zmq.Poller)
20  self.poller_manager = PollerManager(self.poller_mock)
21 
22 

◆ test_add_conections()

def tests.test_app_PollerManager.TestPollerManager.test_add_conections (   self)

Definition at line 23 of file test_app_PollerManager.py.

23  def test_add_conections(self):
24  connection = Connection(self.sock_mock, self.poller_mock)
25  self.poller_manager.add(connection, "sname")
26 
27  self.assertEqual(self.poller_manager.connections[self.sock_mock], "sname", "add is failed")
28 
29 

◆ test_remove_conenction()

def tests.test_app_PollerManager.TestPollerManager.test_remove_conenction (   self)

Definition at line 30 of file test_app_PollerManager.py.

30  def test_remove_conenction(self):
31  connection = Connection(self.sock_mock, self.poller_mock)
32 
33  self.poller_manager.add(connection, "sname")
34  self.poller_manager.remove(connection)
35 
36  with self.assertRaises(KeyError):
37  list(self.poller_manager.connections[self.sock_mock])
38 
39 

◆ test_return_names()

def tests.test_app_PollerManager.TestPollerManager.test_return_names (   self)

Definition at line 48 of file test_app_PollerManager.py.

48  def test_return_names(self):
49  poller_mock_cfg = {"poll.return_value": [(self.sock_mock, zmq.POLLIN)]}
50  self.poller_mock.configure_mock(**poller_mock_cfg)
51 
52  connection = Connection(self.sock_mock, self.poller_mock)
53  self.poller_manager.add(connection, "sname")
54 
55  expect_list = ["sname"]
56 
57  res = self.poller_manager.poll(100)
58 
59  self.assertEqual(expect_list, res, "poll is failed")
60 
61 
62 

◆ test_timeout()

def tests.test_app_PollerManager.TestPollerManager.test_timeout (   self)

Definition at line 40 of file test_app_PollerManager.py.

40  def test_timeout(self):
41  poller_mock_cfg = {"poll.return_value": list()}
42  self.poller_mock.configure_mock(**poller_mock_cfg)
43 
44  with self.assertRaises(ConnectionTimeout):
45  list(self.poller_manager.poll(100))
46 
47 

Member Data Documentation

◆ poller_manager

tests.test_app_PollerManager.TestPollerManager.poller_manager

Definition at line 20 of file test_app_PollerManager.py.

◆ poller_mock

tests.test_app_PollerManager.TestPollerManager.poller_mock

Definition at line 19 of file test_app_PollerManager.py.

◆ sock_mock

tests.test_app_PollerManager.TestPollerManager.sock_mock

Definition at line 18 of file test_app_PollerManager.py.


The documentation for this class was generated from the following file: