HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
transport.Connection.Connection Class Reference
Inheritance diagram for transport.Connection.Connection:
Collaboration diagram for transport.Connection.Connection:

Public Member Functions

def __init__ (self, zmq_socket, zmq_poller, socket_type=consts.CLIENT_CONNECT)
 
def send (self, request, rid=-1)
 
def recv (self, timeout=RECV_DEFAULT_TIMEOUT, rid=-1)
 
def close (self)
 
def is_closed (self)
 

Public Attributes

 zmq_socket
 
 zmq_poller
 
 socket_type
 

Static Public Attributes

int RECV_DEFAULT_TIMEOUT = 5000
 

Detailed Description

Transport class

Definition at line 50 of file Connection.py.

Constructor & Destructor Documentation

◆ __init__()

def transport.Connection.Connection.__init__ (   self,
  zmq_socket,
  zmq_poller,
  socket_type = consts.CLIENT_CONNECT 
)

Definition at line 62 of file Connection.py.

62  def __init__(self, zmq_socket, zmq_poller, socket_type=consts.CLIENT_CONNECT):
63  self.zmq_socket = zmq_socket
64  self.zmq_poller = zmq_poller
65  self.socket_type = socket_type
66  # @todo remove poller
67  # self.zmq_poller.register(self.zmq_socket, zmq.POLLIN)
68 
69 
70 
def __init__(self)
constructor
Definition: UIDGenerator.py:19

Member Function Documentation

◆ close()

def transport.Connection.Connection.close (   self)

Definition at line 142 of file Connection.py.

142  def close(self):
143  # self.zmq_poller.unregister(self.zmq_socket) #is right place??
144  self.zmq_socket.close()
145 
146 

◆ is_closed()

def transport.Connection.Connection.is_closed (   self)

Definition at line 150 of file Connection.py.

150  def is_closed(self):
151  return self.zmq_socket.closed
152 
153 

◆ recv()

def transport.Connection.Connection.recv (   self,
  timeout = RECV_DEFAULT_TIMEOUT,
  rid = -1 
)

Definition at line 117 of file Connection.py.

117  def recv(self, timeout=RECV_DEFAULT_TIMEOUT, rid=-1):
118  try:
119  events = self.zmq_socket.poll(int(timeout), zmq.POLLIN) # pylint: disable=E0602,E1101
120  if events == zmq.POLLIN: # pylint: disable-msg=E1101
121  if self.socket_type == consts.CLIENT_CONNECT:
122  return Response(self.zmq_socket.recv_multipart())
123  else:
124  data = self.zmq_socket.recv_multipart()
125  identity = data[:1]
126  return Response(data[1:], identity[0])
127  else:
128  raise ConnectionTimeout('Connection.recv() timeout: ' + str(timeout) + ', request id: ' + str(rid))
129  except (zmq.ZMQError, zmq.Again) as err: # pylint: disable-msg=E1101
130  logger.error("ZMQ error: " + str(err))
131  raise TransportInternalErr(str(err))
132  except Exception as err:
133  logger.error("General error: " + str(err))
134  raise TransportInternalErr(str(err))
135  # finally:
136 # self.zmq_poller.unregister(self.zmq_socket)
137 
138 

◆ send()

def transport.Connection.Connection.send (   self,
  request,
  rid = -1 
)

Definition at line 76 of file Connection.py.

76  def send(self, request, rid=-1):
77  try:
78  if self.socket_type == consts.CLIENT_CONNECT:
79  snd_data = request.get_body()
80  else:
81  # snd_data = list(request.eventObj.get_body())
82  # snd_data.insert(0, request.connect_identity)
83  # return self.zmq_socket.send_multipart(snd_data)
84  snd_data = [request.connect_identity] + request.eventObj.get_body()
85 
86  try:
87  if request.route is not None:
88  request.route = json.loads(request.route)
89  request.route['task_type'] = request.task_type
90  request.route = json.dumps(request.route)
91  snd_data.append(request.route)
92  else:
93  if request.task_type > 0:
94  snd_data.append(json.dumps({'task_type':request.task_type}))
95 
96  except Exception as err:
97  logger.debug("Error add task_type in to the route data: %s", str(err))
98 
99  logger.debug("send_multipart() socket_type: " + str(self.socket_type) + ", id:" + str(rid) + \
100  " snd_data: " + varDump(snd_data))
101  r = self.zmq_socket.send_multipart(snd_data)
102  logger.debug("Data sent with the zmq_socket.send_multipart()")
103  return r
104  except zmq.ZMQError as err: # pylint: disable-msg=E1101
105  logger.error("ZMQ error: " + str(err))
106  raise TransportInternalErr(str(err))
107  except Exception as err:
108  raise TransportInternalErr('General error: ' + str(err))
109 
110 
111 
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
Here is the call graph for this function:

Member Data Documentation

◆ RECV_DEFAULT_TIMEOUT

int transport.Connection.Connection.RECV_DEFAULT_TIMEOUT = 5000
static

Definition at line 55 of file Connection.py.

◆ socket_type

transport.Connection.Connection.socket_type

Definition at line 65 of file Connection.py.

◆ zmq_poller

transport.Connection.Connection.zmq_poller

Definition at line 64 of file Connection.py.

◆ zmq_socket

transport.Connection.Connection.zmq_socket

Definition at line 63 of file Connection.py.


The documentation for this class was generated from the following file: