HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
DTMC.py
Go to the documentation of this file.
1 '''
2 Created on Mar 19, 2014
3 
4 @package: dtm
5 @author: scorp
6 @link: http://hierarchical-cluster-engine.com/
7 @copyright: Copyright © 2013-2014 IOIX Ukraine
8 @license: http://hierarchical-cluster-engine.com/license/
9 @since: 0.1
10 '''
11 
12 
13 import logging
14 import logging.config
15 from DTMCObjectsSerializator import DTMCObjectsSerializator
16 from cement.core import foundation
17 from transport.ConnectionBuilderLight import ConnectionBuilderLight
18 from transport.Event import EventBuilder
19 from dtm.Constants import EVENT_TYPES as EVENT_TYPES
20 from app.Utils import JsonSerializable
21 import app.Exceptions as Exceptions
22 import transport.Consts
23 import dtm.EventObjects
24 import ConfigParser
25 import Constants as CONSTANTS
26 import json
27 import sys
28 import app.Utils as Utils
29 import app.Consts as APP_CONSTS
30 from app.Utils import ExceptionLog
31 
32 # Logger initialization
33 logger = logging.getLogger(APP_CONSTS.LOGGER_NAME)
34 #logger = logging.getLogger(CONSTANTS.APP_NAME)
35 
36 
37 
39 class DTMC(foundation.CementApp):
40 
41  class Meta: # pylint: disable=W0232, C1001
42  label = CONSTANTS.APP_NAME
43 
44 
45 
47  def __init__(self, connectionBuilder=None):
48  foundation.CementApp.__init__(self)
49  self.config = ConfigParser.ConfigParser()
50  self.connectionBuilder = connectionBuilder
51  self.localConnection = None
54  self.errorCode = CONSTANTS.ERROR_NOERROR
55  self.errorStr = ""
56 
57 
58 
62  def fillError(self, errorStr, errorCode, isLogging=True):
63  self.errorCode = errorCode
64  self.errorStr = errorStr
65  if isLogging:
66  logger.error(self.errorStr)
67 
68 
69 
71  def connectionInit(self):
72  if self.connectionBuilder == None:
74  host = str(self.config.get(CONSTANTS.APP_NAME, CONSTANTS.DTM_HOST))
75  if len(host) > 1 and host[0] == "\"":
76  host = host[1:]
77  if host[-1] == "\"":
78  host = host[0:-1]
79  port = str(self.config.get(CONSTANTS.APP_NAME, CONSTANTS.DTM_PORT))
80  addr = host + ":" + port
81  self.localConnection = self.connectionBuilder.build(transport.Consts.CLIENT_CONNECT, addr,
82  transport.Consts.TCP_TYPE)
83 
84 
85 
90  def taskProcessingDeserialize(self, task, fileName, task_id):
91  ffile = open(fileName, "r")
92  data = ffile.read()
93  ffile.close()
94  jsonData = json.loads(data)
96  eventObj = None
97  if task == CONSTANTS.TASKS[0]:
98  eventObj = self.dTMCObjectsSerializator.newDeserialize(jsonData)
99  if task_id != None:
100  eventObj.id = task_id
101  elif task == CONSTANTS.TASKS[1]:
102  eventObj = self.dTMCObjectsSerializator.checkDeserialize(jsonData)
103  elif task == CONSTANTS.TASKS[2]:
104  eventObj = self.dTMCObjectsSerializator.terminateDeserialize(jsonData)
105  elif task == CONSTANTS.TASKS[3]:
106  eventObj = self.dTMCObjectsSerializator.getDeserialize(jsonData)
107  elif task == CONSTANTS.TASKS[4]:
108  eventObj = self.dTMCObjectsSerializator.statusDeserialize(jsonData)
109  elif task == CONSTANTS.TASKS[5]:
110  eventObj = self.dTMCObjectsSerializator.cleanupDeserialize(jsonData)
111  elif task == CONSTANTS.TASKS[6]:
112  eventObj = self.dTMCObjectsSerializator.getTasksDeserialize(jsonData)
113  return eventObj
114 
115 
116 
119  def generateEmptyResponse(self, task):
120  obj = None
121  jsonString = None
122  if task == None:
124  elif task == CONSTANTS.TASKS[0]:
126  elif task == CONSTANTS.TASKS[1]:
128  elif task == CONSTANTS.TASKS[2]:
130  elif task == CONSTANTS.TASKS[3]:
132  elif task == CONSTANTS.TASKS[4]:
133  obj = []
134  elif task == CONSTANTS.TASKS[5]:
136  elif task == CONSTANTS.TASKS[6]:
138  else:
140  self.errorCode = CONSTANTS.ERROR_UNKNOWN_TASK
141  self.errorStr = CONSTANTS.ERROR_STR14
142  if type(obj) == type([]):
143  jsonString = json.dumps(obj)
144  else:
145  obj.errorCode = self.errorCode
146  obj.errorMessage = self.errorStr
147  jsonString = obj.toJSON()
148  return jsonString
149 
150 
151 
155  def taskProcessingSerialize(self, task, eventObj):
156  jsonString = None
157  eventObjClassName = eventObj.__class__.__name__
158  if task == CONSTANTS.TASKS[0]:
159  if eventObjClassName != dtm.EventObjects.GeneralResponse().__class__.__name__:
160  raise Exceptions.WrongEventObjectTypeException(CONSTANTS.ERROR_STR8.format(task, "GeneralResponse"))
161  elif task == CONSTANTS.TASKS[1]:
162  if eventObjClassName != dtm.EventObjects.EEResponseData(0).__class__.__name__:
163  raise Exceptions.WrongEventObjectTypeException(CONSTANTS.ERROR_STR8.format(task, "EEResponseData"))
164  elif task == CONSTANTS.TASKS[2]:
165  if eventObjClassName != dtm.EventObjects.GeneralResponse().__class__.__name__:
166  raise Exceptions.WrongEventObjectTypeException(CONSTANTS.ERROR_STR8.format(task, "GeneralResponse"))
167  elif task == CONSTANTS.TASKS[3]:
168  if eventObjClassName != dtm.EventObjects.EEResponseData(0).__class__.__name__:
169  raise Exceptions.WrongEventObjectTypeException(CONSTANTS.ERROR_STR8.format(task, "EEResponseData"))
170  elif task == CONSTANTS.TASKS[4]:
171  if eventObjClassName != [].__class__.__name__:
172  raise Exceptions.WrongEventObjectTypeException(CONSTANTS.ERROR_STR8.format(task, "list[]"))
173  i = 0
174  for listElement in eventObj:
175  i += 1
176  eventObjClassName = listElement.__class__.__name__
177  #TODO: replace with isinstance() usage
178  if eventObjClassName != dtm.EventObjects.TaskManagerFields(0).__class__.__name__:
179  raise Exceptions.WrongEventObjectTypeException(CONSTANTS.ERROR_STR8.format(task,
180  "list(TaskManagerFields)[" + str(i) + "]"))
181  elif task == CONSTANTS.TASKS[5]:
182  if eventObjClassName != dtm.EventObjects.GeneralResponse().__class__.__name__:
183  raise Exceptions.WrongEventObjectTypeException(CONSTANTS.ERROR_STR8.format(task, "GeneralResponse"))
184  elif task == CONSTANTS.TASKS[6]:
185  if eventObjClassName != dtm.EventObjects.AvailableTaskIds(0).__class__.__name__:
186  raise Exceptions.WrongEventObjectTypeException(CONSTANTS.ERROR_STR8.format(task, "AvailableTaskIds"))
187  if type(eventObj) == type([]):
188  jsonString = json.dumps(eventObj, default=JsonSerializable.json_serial, sort_keys=True, indent=4)
189  else:
190  try:
191  jsonString = eventObj.toJSON()
192  except UnicodeDecodeError, err:
193  ExceptionLog.handler(logger, err, "<-------------- DECODE Error ---------------->", (eventObj))
194 
195  return jsonString
196 
197 
198 
202  def transportCommunications(self, task, eventObj):
203  timeout = None
204  retEvent = None
205  ret = None
206  try:
207  timeout = self.config.get(CONSTANTS.APP_NAME, CONSTANTS.TCP_TIMEOUT_CONFIG_NAME)
208  except ConfigParser.NoSectionError:
209  timeout = CONSTANTS.TCP_TIMEOUT
210  except ConfigParser.NoOptionError:
211  timeout = CONSTANTS.TCP_TIMEOUT
212  eventType = None
213  if task == CONSTANTS.TASKS[0]:
214  eventType = EVENT_TYPES.NEW_TASK
215  elif task == CONSTANTS.TASKS[1]:
216  eventType = EVENT_TYPES.CHECK_TASK_STATE
217  elif task == CONSTANTS.TASKS[2]:
218  eventType = EVENT_TYPES.DELETE_TASK
219  elif task == CONSTANTS.TASKS[3]:
220  eventType = EVENT_TYPES.FETCH_TASK_RESULTS
221  elif task == CONSTANTS.TASKS[4]:
222  eventType = EVENT_TYPES.GET_TASK_STATUS
223  elif task == CONSTANTS.TASKS[5]:
224  eventType = EVENT_TYPES.DELETE_TASK_RESULTS
225  elif task == CONSTANTS.TASKS[6]:
226  eventType = EVENT_TYPES.FETCH_AVAILABLE_TASK_IDS
227  event = self.eventBuilder.build(eventType, eventObj)
228  self.localConnection.send(event)
229 
230  if self.localConnection.poll(timeout) == 0:
231  self.fillError(CONSTANTS.ERROR_STR7.format(str(timeout)), CONSTANTS.ERROR_NETWORK)
232  else:
233  retEvent = self.localConnection.recv()
234  if retEvent != None:
235  ret = retEvent.eventObj
236  return ret
237 
238 
239 
241  def configReader(self):
242  configReadList = []
243  if self.pargs.config == None:
244  configReadList = self.config.read(CONSTANTS.DEFAULT_CONFIG_NAME1)
245  if len(configReadList) == 0:
246  configReadList = self.config.read(CONSTANTS.DEFAULT_CONFIG_NAME2)
247  else:
248  configReadList = self.config.read(self.pargs.config)
249  return len(configReadList)
250 
251 
252 
255  def loadLogConfigFile(self):
256  global logger # pylint: disable=W0603
257  try:
258  logIniFileName = self.config.get(CONSTANTS.LOG_CONFIG_SECTION_NAME, CONSTANTS.LOG_CONFIG_OPTION_NAME)
259  if logIniFileName != None:
260  logging.config.fileConfig(logIniFileName)
261  except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
262  self.errorStr = CONSTANTS.ERROR_STR10
263  self.errorCode = CONSTANTS.ERROR_LOG_SECTION_ERROR
264  except Exception, err:
265  self.errorStr = CONSTANTS.ERROR_STR11 + ': ' + str(err)
266  self.errorCode = CONSTANTS.ERROR_LOG_INIT
267  logger = Utils.MPLogger().getLogger()
268 
269 
270 
272  def setup(self):
273  foundation.CementApp.setup(self)
274 
275 
276 
278  def run(self):
279  foundation.CementApp.run(self)
280  isHelpArg = False
281  if '-h' in self.argv or '--help' in self.argv:
282  isHelpArg = True
283  eventObj = None
284  retEventObj = None
285  jsonBuf = None
286 
287  if self.configReader() > 0:
288  self.loadLogConfigFile()
289 
290  if self.errorCode == CONSTANTS.ERROR_NOERROR:
291  try:
292  self.connectionInit()
293  except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
294  self.fillError(CONSTANTS.ERROR_STR13, CONSTANTS.ERROR_CONFIG_SECTION)
295  except Exception:
296  self.fillError(CONSTANTS.ERROR_STR12, CONSTANTS.ERROR_CONNECTION)
297  else:
298  if self.localConnection != None:
299  if isHelpArg == False and self.pargs.task == None:
300  self.fillError(CONSTANTS.ERROR_STR1, CONSTANTS.ERROR_ARGS1)
301  elif self.pargs.task != None:
302  if self.pargs.task in CONSTANTS.TASKS:
303  if self.pargs.file != None:
304  try:
305  eventObj = self.taskProcessingDeserialize(self.pargs.task, self.pargs.file, self.pargs.id)
306  except IOError:
307  self.fillError(CONSTANTS.ERROR_STR4, CONSTANTS.ERROR_BAD_FILE_NAME)
308  except ValueError:
309  self.fillError(CONSTANTS.ERROR_STR5, CONSTANTS.ERROR_BAD_JSON)
310  except Exceptions.DeserilizeException as excp:
311  self.fillError(CONSTANTS.ERROR_STR6.format(excp.message), CONSTANTS.ERROR_DTMC)
312  if eventObj != None:
313  retEventObj = self.transportCommunications(self.pargs.task, eventObj)
314  if retEventObj != None:
315  try:
316  jsonBuf = self.taskProcessingSerialize(self.pargs.task, retEventObj)
317  except Exceptions.WrongEventObjectTypeException as excp:
318  self.fillError(excp.message, CONSTANTS.ERROR_WRONG_RESPONSE)
319  else:
320  self.fillError(CONSTANTS.ERROR_STR2, CONSTANTS.ERROR_ARGS2)
321  else:
322  self.fillError(CONSTANTS.ERROR_STR3, CONSTANTS.ERROR_BAD_TASK)
323  else:
324  self.fillError(CONSTANTS.ERROR_STR12, CONSTANTS.ERROR_CONNECTION)
325  else:
326  self.fillError(CONSTANTS.ERROR_STR9, CONSTANTS.ERROR_NO_CONFIG, False)
327 
328  if jsonBuf == None:
329  jsonBuf = self.generateEmptyResponse(self.pargs.task)
330  sys.stdout.write(jsonBuf)
331  # Finish logging
332  logger.info(APP_CONSTS.LOGGER_DELIMITER_LINE)
333 
334 
335 
337  def close(self):
338  foundation.CementApp.close(self)
339 
340 
connectionBuilder
Definition: DTMC.py:50
def generateEmptyResponse(self, task)
generateEmptyResponse method If here was some critical error, we generate empty response here ...
Definition: DTMC.py:119
def __init__(self, connectionBuilder=None)
constructor initialise all class variable and recieve connectionBuilder as param(not mandatory) ...
Definition: DTMC.py:47
def setup(self)
setup method Method calls before run application
Definition: DTMC.py:272
def loadLogConfigFile(self)
load logging load logging configuration (log file, log level, filters)
Definition: DTMC.py:255
GeneralResponse event object, represents general state response for multipurpose usage.
AvailableTaskIds event object, for return all available task id.
dTMCObjectsSerializator
Definition: DTMC.py:53
def connectionInit(self)
connectionInit method initializes internal variables that containts network connections/communication...
Definition: DTMC.py:71
DTMCObjectsSerializator Class contents serialize/deserialize methods for incoming "DTMC" commands...
def configReader(self)
configReader method Method try to read config file by prereared paths, return count of readed configs...
Definition: DTMC.py:241
def taskProcessingDeserialize(self, task, fileName, task_id)
taskProcessingDeserialize method Reads task from file and deserializes it.
Definition: DTMC.py:90
DTMC Class contents main functional of DTMC application, class inherits from foundation.CementApp.
Definition: DTMC.py:39
Class hides routines of bulding connection objects.
def close(self)
close method Method calls after application run
Definition: DTMC.py:337
TaskManagerFields event object, for return task fields values.
def transportCommunications(self, task, eventObj)
taskProcessingDeserialize method Method serializes incoming task to the JSON string task - task arg e...
Definition: DTMC.py:202
localConnection
Definition: DTMC.py:51
def run(self)
run method Method contains main application functionality
Definition: DTMC.py:278
EEResponseData event object, store task results data, returned from EE.
def fillError(self, errorStr, errorCode, isLogging=True)
fillError method calls from error-code point from main processing (...from event handlers) errorStr -...
Definition: DTMC.py:62
def taskProcessingSerialize(self, task, eventObj)
taskProcessingDeserialize method Method serializes incoming task to the JSON string task - task arg e...
Definition: DTMC.py:155