HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
Connection.py
Go to the documentation of this file.
1 '''
2 Created on Feb 4, 2014
3 
4 @author: igor, bgv
5 '''
6 
7 
8 import json
9 
10 from collections import namedtuple
11 import zmq
12 
13 import Consts as consts
14 from Response import Response
15 import logging
16 import app.Consts as APP_CONSTS
17 from app.Utils import varDump
18 
19 # Logger initialization
20 logger = logging.getLogger(APP_CONSTS.LOGGER_NAME)
21 
22 
23 # #Class is used to inform about expiration of operation timeout
24 #
25 class ConnectionTimeout(Exception):
26 
27  def __init__(self, message):
28  Exception.__init__(self, message)
29 
30 
31 # #Exception which express some critical errors in the network engine
32 #
33 # At the moment critical errors are zmq.ZMQError, zmq.Again
34 class TransportInternalErr(Exception):
35 
36  def __init__(self, message):
37  Exception.__init__(self, message)
38 
39 
40 # #Structure which describes all data need to establish connection
41 #
42 # @todo - add a protocol param as independent part
43 ConnectionParams = namedtuple("ConnectionParams", "host port")
44 
45 
46 # #Main class of the transport layer.
47 #
48 # The class has two main methods - send, recv which are used to
49 # exchange messages
50 class Connection(object):
51  '''
52  Transport class
53  '''
54 
55  RECV_DEFAULT_TIMEOUT = 5000
56 
57  # #constructor
58  #
59  # @param zmq_socket an instance of zmq.socket object
60  # @param zmq_poller an instance of zmq.poller object
61  # @param socket_type type of socket(seerver or client side)
62  def __init__(self, zmq_socket, zmq_poller, socket_type=consts.CLIENT_CONNECT):
63  self.zmq_socket = zmq_socket
64  self.zmq_poller = zmq_poller
65  self.socket_type = socket_type
66  # @todo remove poller
67  # self.zmq_poller.register(self.zmq_socket, zmq.POLLIN)
68 
69 
70 
71  # #send request
72  #
73  # @param request an instance of Request object
74  # @param rid request id for logging only
75  # @return None or throw TransportInternalErr
76  def send(self, request, rid=-1):
77  try:
78  if self.socket_type == consts.CLIENT_CONNECT:
79  snd_data = request.get_body()
80  else:
81  # snd_data = list(request.eventObj.get_body())
82  # snd_data.insert(0, request.connect_identity)
83  # return self.zmq_socket.send_multipart(snd_data)
84  snd_data = [request.connect_identity] + request.eventObj.get_body()
85 
86  try:
87  if request.route is not None:
88  request.route = json.loads(request.route)
89  request.route['task_type'] = request.task_type
90  request.route = json.dumps(request.route)
91  snd_data.append(request.route)
92  else:
93  if request.task_type > 0:
94  snd_data.append(json.dumps({'task_type':request.task_type}))
95 
96  except Exception as err:
97  logger.debug("Error add task_type in to the route data: %s", str(err))
98 
99  logger.debug("send_multipart() socket_type: " + str(self.socket_type) + ", id:" + str(rid) + \
100  " snd_data: " + varDump(snd_data))
101  r = self.zmq_socket.send_multipart(snd_data)
102  logger.debug("Data sent with the zmq_socket.send_multipart()")
103  return r
104  except zmq.ZMQError as err: # pylint: disable-msg=E1101
105  logger.error("ZMQ error: " + str(err))
106  raise TransportInternalErr(str(err))
107  except Exception as err:
108  raise TransportInternalErr('General error: ' + str(err))
109 
110 
111 
112  # #recieve data from connection
113  #
114  # @param timeout timeout in msec, default RECV_DEFAULT_TIMEOUT msec
115  # @param rid request id for logging only
116  # @return Response or throw ConnectionTimeout, TransportInternalErr
117  def recv(self, timeout=RECV_DEFAULT_TIMEOUT, rid=-1):
118  try:
119  events = self.zmq_socket.poll(int(timeout), zmq.POLLIN) # pylint: disable=E0602,E1101
120  if events == zmq.POLLIN: # pylint: disable-msg=E1101
121  if self.socket_type == consts.CLIENT_CONNECT:
122  return Response(self.zmq_socket.recv_multipart())
123  else:
124  data = self.zmq_socket.recv_multipart()
125  identity = data[:1]
126  return Response(data[1:], identity[0])
127  else:
128  raise ConnectionTimeout('Connection.recv() timeout: ' + str(timeout) + ', request id: ' + str(rid))
129  except (zmq.ZMQError, zmq.Again) as err: # pylint: disable-msg=E1101
130  logger.error("ZMQ error: " + str(err))
131  raise TransportInternalErr(str(err))
132  except Exception as err:
133  logger.error("General error: " + str(err))
134  raise TransportInternalErr(str(err))
135  # finally:
136 # self.zmq_poller.unregister(self.zmq_socket)
137 
138 
139  # close zqm.socket
140  #
141  # @return None
142  def close(self):
143  # self.zmq_poller.unregister(self.zmq_socket) #is right place??
144  self.zmq_socket.close()
145 
146 
147  # #check status of zqm.socket
148  #
149  # @return bool
150  def is_closed(self):
151  return self.zmq_socket.closed
152 
153 
154  # Destroys the object, close connection
155  #
156  # @return None
157 def __del__(self):
158  try:
159  self.close()
160  except Exception:
161  pass
162 
def __init__(self, zmq_socket, zmq_poller, socket_type=consts.CLIENT_CONNECT)
Definition: Connection.py:62
It's a wrapper similar to zmsg.hpp in sense of encapsulation of hce response message structure...
Definition: Response.py:20
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
def recv(self, timeout=RECV_DEFAULT_TIMEOUT, rid=-1)
Definition: Connection.py:117
def send(self, request, rid=-1)
Definition: Connection.py:76