HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
DRCEManager.py
Go to the documentation of this file.
1 '''
2 Created on Feb 14, 2014
3 
4 @author: igor
5 '''
6 
7 import logging
8 from collections import namedtuple
9 
10 from transport.Connection import ConnectionParams, ConnectionTimeout
11 from transport.Connection import TransportInternalErr
12 
13 from drce.CommandConvertor import CommandConvertor
14 from drce.CommandExecutor import CommandExecutor, CommandExecutorErr
15 from drce.ConnectionManager import ConnectionManager
16 
17 import app.Consts as APP_CONSTS
18 # Logger initialization
19 logger = logging.getLogger(APP_CONSTS.LOGGER_NAME)
20 
21 # #High level users class. May contains various info
22 #
23 HostParams = namedtuple("HostParams", "host port")
24 
25 
26 # # Class which provides base functionality for processing drce tasks(commands)
27 #
28 # For customisation of behaviour implement hook* methods
29 class DRCEManager(object):
30 
31 
32  # #constructor
33  def __init__(self):
34  '''
35  Constructor
36  '''
37  # @var connection
38  # a member variable, holds current connection,
39  # will be used in inheritance classes
40  self.connection = None
41  # @var connect_params
42  # a member variable, holds current connect_params
43  # will be used in inheritance classes
44  self.connect_params = None
47 
48 
49  # #create connection for a host
50  #
51  # @param host_params an instance HostParams
52  # @return None
53  def activate_host(self, host_params):
54  con_params = ConnectionParams(host_params.host, host_params.port)
55  self.connection = self.connect_manager.create_connection(con_params)
56  self.connect_params = con_params
57  self.cmd_executor.replace_connection(self.connection)
58 
59 
60  # #free resources related to self.connection object
61  #
62  # @return None
63  def clear_host(self):
64  self.connect_manager.destroy_connection(self.connection)
65  self.connection = None
66  self.connect_params = None
67  self.cmd_executor.replace_connection(self.connection)
68 
69 
70  # #process command
71  #
72  # #by default - all throws are raised up (propagate out)
73  # @param commans one from Commands.taskrequest*
74  # @param timeout max wait time for processing command
75  # @param ttl time of task live
76  # @return TaskResponse
77  def process(self, command, timeout=3000, ttl=300000):
78  try:
79  return self.cmd_executor.execute(command, timeout, ttl)
80  except ConnectionTimeout as err:
81  self.timeout_hook(err, command, timeout)
82  except TransportInternalErr as err:
83  self.transport_err_hook(err, command, timeout)
84  except CommandExecutorErr as err:
85  self.executor_err_hook(err, command, timeout)
86 
87  # #function will be called when raises ConnectionTimeout
88  #
89  # @param connection_timeout an instance of ConnectionTimeout
90  # @param command the current processing command
91  # @param timeout timeout related to the command
92  # @return re-raise connection_timeout
93  # pylint: disable=W0612
94  def timeout_hook(self, connection_timeout, command, timeout):
95  del command, timeout
96  raise connection_timeout
97 
98 
99  # #function will be called when raises TransportInternalErr
100  #
101  # @param transport_internal_err an instance of TransportInternalErr
102  # @param command the current processing command
103  # @param timeout timeout related to the command
104  # @return re-raise transport_internal_err
105  def transport_err_hook(self, transport_internal_err, command, timeout):
106  del command, timeout
107  raise transport_internal_err
108 
109 
110  # #function will be called when raises CommandExecutorError
111  #
112  # @param command_executor_err an instance of CommandExecutorError
113  # @param command the current processing command
114  # @param timeout timeout related to the command
115  # @return re-raise command_executor_err
116  def executor_err_hook(self, command_executor_err, command, timeout):
117  del command, timeout
118  raise command_executor_err
def process(self, command, timeout=3000, ttl=300000)
Definition: DRCEManager.py:77
def execute(self, commands, nodes)
execute method execute incoming commands on nodes, keepts reult in responses and responsesDicts field...
Definition: NodeManager.py:63
def activate_host(self, host_params)
Definition: DRCEManager.py:53
def executor_err_hook(self, command_executor_err, command, timeout)
Definition: DRCEManager.py:116
Simple wrapper that hide all routines related to create and destroy connections.
def transport_err_hook(self, transport_internal_err, command, timeout)
Definition: DRCEManager.py:105
Convertor which used to convert Task*Reques to json and TaskResponse from json.
def timeout_hook(self, connection_timeout, command, timeout)
Definition: DRCEManager.py:94