HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
CommandExecutor.py
Go to the documentation of this file.
1 '''
2 Created on Feb 13, 2014
3 
4 @author: igor
5 '''
6 
7 import json
8 
9 from transport.Request import Request
10 from transport.Response import ResponseFormatErr
11 from transport.UIDGenerator import UIDGenerator
12 from drce.CommandConvertor import CommandConvertorError, TaskExecuteStructEncoder
13 from drce.Commands import DRCECover
14 import app.Utils as Utils # pylint: disable=F0401
15 
16 # Logger initialization
17 logger = Utils.MPLogger().getLogger()
18 
19 
20 # import base64
21 # #Exception which encapsulates errors related to convertatin operations
22 #
23 class CommandExecutorErr(Exception):
24 
25  def __init__(self, msg):
26  Exception.__init__(self, msg)
27 
28 
29 # #Main point for execution commands related to the drce
30 # It hides all complexities of command execution
31 #
32 class CommandExecutor(object):
33 
34 
35  # #constructor
36  #
37  # @param connection an instance of transport.Connection object
38  # @param cmd_convertor an instance of CommandConvertor object
39  def __init__(self, connection, cmd_convertor):
40  self.connection = connection
41  self.cmd_convertor = cmd_convertor
42  # @var id_generator
43  # a member variable, used to generate uid for messages
45 
46 
47  # #simple wrapper to explicit express operation of replace connection
48  # add ability to execute commands on many connections
49  #
50  # @param connection an instance of transport.Connection object
51  # @return None
52  def replace_connection(self, connection):
53  self.connection = connection
54 
55 
56 
57  # #execute Task*Request command
58  #
59  # because a connection is external dependence - doesn't catch it exceptions
60  # @param command an instance of Task*Request object
61  # @param timeout timeout in msec
62  # @param ttl time of task live in msec
63  # @param maxTries max tries to receive message, if zero and received message that is not the same id as requested
64  # it will be returned, if grater than zero than N tries to receive message will be performed. If
65  # timeout reached it breaks loop and exception raised
66  # @return TaskResponse or throw CommandExecutorErr
67  def execute(self, command, timeout=1000, ttl=30000, maxTries=100):
68  if maxTries < 0:
69  maxTries = 0
70 
71  try:
72  logger.debug("command: %s", Utils.varDump(command, strTypeMaxLen=10000))
73  cmd_json = self.cmd_convertor.to_json(command, logger)
74  drce_cover_envelop = DRCECover(ttl, cmd_json)
75  request = Request(self.id_generator.get_uid())
76  request.add_data(json.dumps(drce_cover_envelop, cls=TaskExecuteStructEncoder))
77  request.route = command.route
78  request.task_type = command.task_type
79 
80  uid = request.get_uid()
81  logger.debug("Send DRCE request msg Id:" + uid + ", route: " + str(request.route))
82 
83  self.connection.send(request, uid)
84 
85  for i in range(int(maxTries) + 1):
86  response = self.connection.recv(timeout, uid)
87  logger.debug("Received DRCE response msg Id:" + response.get_uid())
88  if response.get_uid() == request.get_uid() or maxTries == 0:
89  cover_envelop_response = json.loads(response.get_body())
90  return self.cmd_convertor.from_json(cover_envelop_response["data"])
91  else:
92  logger.error("DRCE response msg Id:" + response.get_uid() + \
93  " not matched with the request msg Id:" + request.get_uid() + \
94  ", try: " + str(i))
95  except (ResponseFormatErr, CommandConvertorError, KeyError, CommandExecutorErr) as err:
96  logger.error("DRCE object model error: %s", str(err))
97  raise CommandExecutorErr(str(err))
98  except Exception as err:
99  logger.error("General error: %s", str(err))
100  raise CommandExecutorErr(str(err))
101 
def execute(self, command, timeout=1000, ttl=30000, maxTries=100)
def replace_connection(self, connection)
UIDGenerator is used to generate unique message id.
Definition: UIDGenerator.py:14
It&#39;s a wrapper similar to zmsg.hpp in sense of encapsulation of hce message structure.
Definition: Request.py:11
wrapper for cover object
Definition: Commands.py:145
def __init__(self, connection, cmd_convertor)