HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
ClientInterfaceService.py
Go to the documentation of this file.
1 '''
2 @package: dtm
3 @author igor, bgv
4 @link: http://hierarchical-cluster-engine.com/
5 @copyright: Copyright © 2013-2014 IOIX Ukraine
6 @license: http://hierarchical-cluster-engine.com/license/
7 @since: 0.1
8 '''
9 
10 
11 import logging
12 from app.BaseServerManager import BaseServerManager
13 from Constants import EVENT_TYPES, LOGGER_NAME
14 from EventObjects import GeneralResponse
15 from EventObjects import DeleteTask
16 import transport.Consts as consts
17 
18 logger = logging.getLogger(LOGGER_NAME)
19 
20 CONFIG_SECTION = "ClientInterfaceService"
21 
22 
25 
26  SERVER_TCP = "server_tcp"
27  SERVER_INPROC = "server"
28  CONFIG_SERVER_HOST = "serverHost"
29  CONFIG_SERVER_PORT = "serverPort"
30  TASKS_MANAGER_CLIENT = "clientTasksManager"
31  EXECUTION_ENVIRONMENT_MANAGER_CLINET = "clientExecutionEnvironmentManager"
32 
33 
38  def __init__(self, configParser, connectBuilderLight):
39  '''
40  Constructor
41  '''
42  super(ClientInterfaceService, self).__init__()
43 
44  self.beforeStop = False
45  self.cfg_section = self.__class__.__name__
46 
47  serverAddrInproc = configParser.get(self.cfg_section, self.SERVER_INPROC)
48  serverHost = configParser.get(CONFIG_SECTION, self.CONFIG_SERVER_HOST)
49  serverPort = configParser.get(CONFIG_SECTION, self.CONFIG_SERVER_PORT)
50  serverAddrTcp = serverHost + ":" + str(serverPort)
51  clientTaskManager = configParser.get(CONFIG_SECTION, self.TASKS_MANAGER_CLIENT)
52  clientExecutionEnvironmentManager = configParser.get(CONFIG_SECTION, self.EXECUTION_ENVIRONMENT_MANAGER_CLINET)
53 
54  serverConnectionInproc = connectBuilderLight.build(consts.SERVER_CONNECT, serverAddrInproc)
55  serverConnectionTcp = connectBuilderLight.build(consts.SERVER_CONNECT, serverAddrTcp, consts.TCP_TYPE)
56  taskManagerConnection = connectBuilderLight.build(consts.CLIENT_CONNECT, clientTaskManager)
57  eeManagerConnection = connectBuilderLight.build(consts.CLIENT_CONNECT, clientExecutionEnvironmentManager)
58 
59  self.addConnection(self.SERVER_TCP, serverConnectionTcp)
60  self.addConnection(self.SERVER_INPROC, serverConnectionInproc)
61  self.addConnection(self.TASKS_MANAGER_CLIENT, taskManagerConnection)
62  self.addConnection(self.EXECUTION_ENVIRONMENT_MANAGER_CLINET, eeManagerConnection)
63 
64  self.setEventHandler(EVENT_TYPES.NEW_TASK, self.onTaskManagerRoute)
65  self.setEventHandler(EVENT_TYPES.UPDATE_TASK, self.onTaskManagerRoute)
66  self.setEventHandler(EVENT_TYPES.GET_TASK_STATUS, self.onTaskManagerRoute)
67  self.setEventHandler(EVENT_TYPES.FETCH_RESULTS_CACHE, self.onTaskManagerRoute)
68  self.setEventHandler(EVENT_TYPES.DELETE_TASK, self.onTaskManagerRoute)
69  self.setEventHandler(EVENT_TYPES.UPDATE_TASK_FIELDS, self.onTaskManagerRoute)
70  self.setEventHandler(EVENT_TYPES.FETCH_AVAILABLE_TASK_IDS, self.onTaskManagerRoute)
71 
72  self.setEventHandler(EVENT_TYPES.CHECK_TASK_STATE, self.onEEManagerRoute)
73  self.setEventHandler(EVENT_TYPES.FETCH_TASK_RESULTS, self.onEEManagerRoute)
74  self.setEventHandler(EVENT_TYPES.DELETE_TASK_RESULTS, self.onEEManagerRoute)
75 
76  self.setEventHandler(EVENT_TYPES.NEW_TASK_RESPONSE, self.onDTMClientRoute)
77  self.setEventHandler(EVENT_TYPES.UPDATE_TASK_RESPONSE, self.onDTMClientRoute)
78  self.setEventHandler(EVENT_TYPES.CHECK_TASK_STATE_RESPONSE, self.onDTMClientRoute)
79  self.setEventHandler(EVENT_TYPES.GET_TASK_STATUS_RESPONSE, self.onDTMClientRoute)
80  self.setEventHandler(EVENT_TYPES.FETCH_TASK_RESULTS_RESPONSE, self.onDTMClientRoute)
81  self.setEventHandler(EVENT_TYPES.DELETE_TASK_RESPONSE, self.onDTMClientRoute)
82  self.setEventHandler(EVENT_TYPES.DELETE_TASK_RESULTS_RESPONSE, self.onDTMClientRoute)
83  self.setEventHandler(EVENT_TYPES.UPDATE_TASK_FIELDS_RESPONSE, self.onDTMClientRoute)
84  self.setEventHandler(EVENT_TYPES.AVAILABLE_TASK_IDS_RESPONSE, self.onDTMClientRoute)
85 
86  #map of incoming event, which are in processing
87  # event.uid => event without eventObj field
88  self.processEvents = dict()
89 
90 
91 
94  def onTaskManagerRoute(self, event):
95  if not self.beforeStop:
96  if event.eventType == EVENT_TYPES.DELETE_TASK and event.eventObj.deleteTaskId == DeleteTask.GROUP_DELETE:
97  self.beforeStop = True
98  self.send(self.TASKS_MANAGER_CLIENT, event)
99  self.registreEvent(event)
100  else:
101  defaultObject = GeneralResponse(GeneralResponse.ERROR_OK, "DTM in prestopped state")
102  responseEvent = self.eventBuilder.build(EVENT_TYPES.GENERAL_RESPONSE, defaultObject)
103  self.reply(event, responseEvent)
104 
105 
106 
107 
110  def onEEManagerRoute(self, event):
112  self.registreEvent(event)
113 
114 
115 
118  def onDTMClientRoute(self, event):
119  try:
120  request_event = self.getRequestEvent(event)
121  self.reply(request_event, event)
122  self.unregisteEvent(request_event)
123  except KeyError as err:
124  logger.error(str(err.message))
125 
126 
127 
130  def registreEvent(self, event):
131  event.eventObj = None
132  self.processEvents[event.uid] = event
133 
134 
135 
138  def getRequestEvent(self, event):
139  return self.processEvents[event.uid]
140 
141 
142 
145  def unregisteEvent(self, event):
146  del self.processEvents[event.uid]
def reply(self, event, reply_event)
wrapper for sending event in reply for event
def getRequestEvent(self, event)
get request event from processEvents map
def onDTMClientRoute(self, event)
handler to route all response event to DTMClient
def registreEvent(self, event)
add event in map of processing events
def onEEManagerRoute(self, event)
handler to route all event to EEManager
GeneralResponse event object, represents general state response for multipurpose usage.
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
def addConnection(self, name, connection)
This is app base class for management server connection end-points and parallel transport messages pr...
def __init__(self, configParser, connectBuilderLight)
constructor initialise all connections and event handlers
def send(self, connect_name, event)
send event
def unregisteEvent(self, event)
delete event in map of processing events
The gateway for dmt client communications.
def onTaskManagerRoute(self, event)
handler to route all event to TaksManager