HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
ConnectionLight.py
Go to the documentation of this file.
1 '''
2 Created on Feb 28, 2014
3 
4 @author: igor, bgv
5 '''
6 
7 # import pickle
8 try:
9  import cPickle as pickle
10 except ImportError:
11  import pickle
12 
13 import logging
14 import zmq
15 from transport.Connection import TransportInternalErr
16 import transport.Consts as consts
17 import app.Consts as APP_CONSTS
18 from app.Utils import varDump
19 
20 
21 # Logger initialization
22 logger = logging.getLogger(APP_CONSTS.LOGGER_NAME)
23 
24 # #Wrapper class on zmq.Socket that operate python objects
25 #
26 class ConnectionLight(object):
27 
28  POLL_DEFAULT_TIMEOUT = 5000
29 
30  def __init__(self, zmq_socket, socket_type, addr="", connected=True):
31  '''
32  Constructor
33  '''
34  self.zmq_socket = zmq_socket
35  self.socket_type = socket_type
36  self.connected = connected
37  self.addr = addr
38 
39  # #send python object
40  #
41  # @param obj python object
42  # @return None or throw TransportInternalErr
43  def send(self, event_obj):
44  try:
45  logger.debug("event_obj: %s", varDump(event_obj))
46  self.connect()
47  if self.socket_type == consts.CLIENT_CONNECT:
48  self.zmq_socket.send_pyobj(event_obj)
49  else:
50  pickle_event = pickle.dumps(event_obj)
51  self.zmq_socket.send_multipart([event_obj.connect_identity, pickle_event])
52  except zmq.ZMQError as err: # pylint: disable-msg=E1101
53  raise TransportInternalErr(err.message)
54  except Exception, err:
55  logger.error("Error `%s`", str(err))
56 
57 
58  # #recieve python object from connection
59  #
60  # @return python object(event) or throw TransportInternalErr
61  def recv(self):
62  try:
63  self.connect()
64  if self.socket_type == consts.CLIENT_CONNECT:
65  pyObj = self.zmq_socket.recv_pyobj()
66  logger.debug("pyObj: %s", varDump(pyObj))
67  return pyObj
68  else:
69  identity, pickle_event = self.zmq_socket.recv_multipart()
70  event = pickle.loads(pickle_event)
71  event.connect_identity = identity
72  logger.debug("event: %s", varDump(event))
73  return event
74  except zmq.ZMQError, err: # pylint: disable-msg=E1101
75  raise TransportInternalErr(err)
76  except Exception, err:
77  logger.error("Error `%s`", str(err))
78 
79 
80  # #poll the socket for events
81  #
82  # @param timeout the timeout (in milliseconds) to wait for an event
83  # @param flags the event flags to poll for
84  # @return The events that are ready and waiting.
85  # Will be 0 if no events were ready by the time timeout was reached
86  def poll(self, timeout=POLL_DEFAULT_TIMEOUT, flags=zmq.POLLIN): # pylint: disable-msg=E1101
87  self.connect()
88  return self.zmq_socket.poll(int(timeout), flags)
89 
90 
91  # close zqm.socket
92  #
93  # @return None
94  def close(self):
95  self.connect()
96  self.zmq_socket.close()
97  self.connected = False
98 
99 
100  # #Really connect to listen socket with addr
101  #
102  #
103  def connect(self):
104  if self.connected is False:
105  self.zmq_socket.connect(self.addr)
106  self.connected = True
def poll(self, timeout=POLL_DEFAULT_TIMEOUT, flags=zmq.POLLIN)
def varDump(obj, stringify=True, strTypeMaxLen=256, strTypeCutSuffix='...', stringifyType=1, ignoreErrors=False, objectsHash=None, depth=0, indent=2, ensure_ascii=False, maxDepth=10)
Definition: Utils.py:410
def __init__(self, zmq_socket, socket_type, addr="", connected=True)