10 from collections
import namedtuple
13 import Consts
as consts
14 from Response
import Response
20 logger = logging.getLogger(APP_CONSTS.LOGGER_NAME)
28 Exception.__init__(self, message)
37 Exception.__init__(self, message)
43 ConnectionParams = namedtuple(
"ConnectionParams",
"host port")
55 RECV_DEFAULT_TIMEOUT = 5000
62 def __init__(self, zmq_socket, zmq_poller, socket_type=consts.CLIENT_CONNECT):
76 def send(self, request, rid=-1):
79 snd_data = request.get_body()
84 snd_data = [request.connect_identity] + request.eventObj.get_body()
87 if request.route
is not None:
88 request.route = json.loads(request.route)
89 request.route[
'task_type'] = request.task_type
90 request.route = json.dumps(request.route)
91 snd_data.append(request.route)
93 if request.task_type > 0:
94 snd_data.append(json.dumps({
'task_type':request.task_type}))
96 except Exception
as err:
97 logger.debug(
"Error add task_type in to the route data: %s", str(err))
99 logger.debug(
"send_multipart() socket_type: " + str(self.
socket_type) +
", id:" + str(rid) + \
100 " snd_data: " +
varDump(snd_data))
102 logger.debug(
"Data sent with the zmq_socket.send_multipart()")
104 except zmq.ZMQError
as err:
105 logger.error(
"ZMQ error: " + str(err))
107 except Exception
as err:
117 def recv(self, timeout=RECV_DEFAULT_TIMEOUT, rid=-1):
119 events = self.
zmq_socket.poll(int(timeout), zmq.POLLIN)
120 if events == zmq.POLLIN:
126 return Response(data[1:], identity[0])
128 raise ConnectionTimeout(
'Connection.recv() timeout: ' + str(timeout) +
', request id: ' + str(rid))
129 except (zmq.ZMQError, zmq.Again)
as err:
130 logger.error(
"ZMQ error: " + str(err))
132 except Exception
as err:
133 logger.error(
"General error: " + str(err))
def __init__(self, message)
def __init__(self, zmq_socket, zmq_poller, socket_type=consts.CLIENT_CONNECT)
It's a wrapper similar to zmsg.hpp in sense of encapsulation of hce response message structure...
def __init__(self, message)
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
def recv(self, timeout=RECV_DEFAULT_TIMEOUT, rid=-1)
def send(self, request, rid=-1)