HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
test_transport_Connection.py
Go to the documentation of this file.
1 '''
2 Created on Feb 4, 2014
3 
4 @author: igor
5 '''
6 import unittest
7 import zmq
8 from transport.Connection import Connection
9 from transport.Connection import ConnectionTimeout
10 #from transport.Connection import ConnectionParams
11 from transport.Request import Request
12 
13 
14 class TestConnection(unittest.TestCase):
15 
16 
17  def setUp(self):
18  self.context = zmq.Context()
20  self.poller = zmq.Poller()
21  self.connection = Connection(self.connect_pair[0], self.poller)
22 
23 
24  def test_send(self):
25  request = Request("1")
26  request.add_data("Hello people!")
27  expect_msg = [b"1", b"Hello people!"]
28 
29  self.connection.send(request)
30  recv_message = self.connect_pair[1].recv_multipart()
31 
32  self.assertEqual(expect_msg, recv_message, "connection send doesn't work")
33 
34 
35  def test_recv_message(self):
36  message = ["sock_identity", "232", "New message"]
37  self.connect_pair[1].send_multipart(message)
38  response = self.connection.recv(1000)
39 
40  self.assertEqual(message[0], response.get_uid(), "connection recv is failed")
41  self.assertEqual(message[1], response.get_body(), "connection recv is failed")
42 
43 
45  with self.assertRaises(ConnectionTimeout):
46  list(self.connection.recv(250))
47 
48 
50  self.assertTrue(self.connection.is_closed() == False, "sock must be valid")
51 
52 
53  def test_disconnect(self):
54  self.connection.close()
55  self.assertTrue(self.connection.is_closed() == True, "close doesn't work")
56 
57 
58  def create_bound_pair(self, type1=zmq.PAIR, type2=zmq.PAIR, interface='tcp://127.0.0.1'):
59  """Create a bound socket pair using a random port."""
60  s1 = self.context.socket(type1)
61  s1.setsockopt(zmq.LINGER, 0)
62  port = s1.bind_to_random_port(interface)
63  s2 = self.context.socket(type2)
64  s2.setsockopt(zmq.LINGER, 0)
65  s2.connect('%s:%s' % (interface, port))
66  return s2, s1
def create_bound_pair(self, type1=zmq.PAIR, type2=zmq.PAIR, interface='tcp://127.0.0.1')
It's a wrapper similar to zmsg.hpp in sense of encapsulation of hce message structure.
Definition: Request.py:11