HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
TasksDataManager.py
Go to the documentation of this file.
1 '''
2 Created on Mar 05, 2014
3 
4 @package: dtm
5 @author: scorp
6 @link: http://hierarchical-cluster-engine.com/
7 @copyright: Copyright © 2013-2014 IOIX Ukraine
8 @license: http://hierarchical-cluster-engine.com/license/
9 @since: 0.1
10 '''
11 
12 try:
13  import cPickle as pickle
14 except ImportError:
15  import pickle
16 
17 import ConfigParser
18 import logging
19 
20 import transport.Consts
21 from transport.ConnectionBuilderLight import ConnectionBuilderLight
22 from transport.Event import EventBuilder
23 from app.BaseServerManager import BaseServerManager
24 from app.LogFormatter import LogFormatterEvent
25 from dbi.dbi import DBI
26 from dbi.dbi import DBIErr
27 from dtm.Constants import EVENT_TYPES as EVENT
28 from dtm.EEResponsesTable import EEResponsesTable
29 from dtm.EventObjects import GeneralResponse
30 from dtm.TasksDataTable import TasksDataTable
31 import dtm.Constants as CONSTANTS
32 
33 logger = logging.getLogger(CONSTANTS.LOGGER_NAME)
34 
35 
38 
39 
40  TASK_DATA_MANAGER_SERV_NAME = "Server"
41  TASK_DATA_MANAGER_SERV_CONFIG_NAME = "ServerName"
42 
43 
44 
46  def __init__(self, config, connectionBuilder=None):
47  BaseServerManager.__init__(self)
48  self.moduleName = self.__class__.__name__
49  self.servIndex = 1
51  self.setEventHandler(EVENT.NEW_TASK, self.onNewTask)
52  self.setEventHandler(EVENT.FETCH_TASK_DATA, self.onFetchTask)
53  self.setEventHandler(EVENT.UPDATE_TASK, self.onUpdateTask)
54  self.setEventHandler(EVENT.DELETE_TASK_DATA, self.onDeleteTask)
55  self.setEventHandler(EVENT.INSERT_EE_DATA, self.onInsertEEResponse)
56  self.setEventHandler(EVENT.FETCH_EE_DATA, self.onFetchEEResponse)
57  self.setEventHandler(EVENT.DELETE_EE_DATA, self.onDeleteEEResponse)
58  self.connectionBuilder = connectionBuilder
59  self.config = config
60  self.dbi = None
61  self.connectionInit()
62  self.dbInit()
63 
64 
65 
67  def connectionInit(self):
68  if self.connectionBuilder is None:
70  try:
71  localConnection = self.connectionBuilder.build(transport.Consts.SERVER_CONNECT, \
73  self.addConnection(self.TASK_DATA_MANAGER_SERV_NAME + str(self.servIndex), localConnection)
74  self.servIndex = self.servIndex + 1
75  except ConfigParser.NoSectionError:
76  logger.error(">>> TasksDataManager can't read config - Section Error")
77  except ConfigParser.NoOptionError:
78  logger.error(">>> TasksDataManager can't read config - Option Error")
79 
80 
81 
83  def dbInit(self):
84  if self.config != None:
85  dic = None
86  dic = dict(self.config.items(CONSTANTS.DB_CONFIG_SECTION))
87  if dic != None:
88  self.dbi = DBI(dic)
89 
90 
91 
93  def dbiProcessing(self, data, eventType):
94  # variable for result
95  ret = None
96 
97  if eventType == EVENT.NEW_TASK:
98  self.dbi.insert(data)
99  elif eventType == EVENT.FETCH_TASK_DATA:
100  ret = self.dbi.fetch(data, "id=%s" % data.id)
101  elif eventType == EVENT.UPDATE_TASK:
102  self.dbi.update(data, "id=%s" % data.id)
103  elif eventType == EVENT.DELETE_TASK_DATA:
104  self.dbi.delete(data, "id=%s" % data.id)
105  elif eventType == EVENT.INSERT_EE_DATA:
106  self.dbi.insert(data)
107  elif eventType == EVENT.FETCH_EE_DATA:
108  ret = self.dbi.fetch(data, "id=%s" % data.id)
109  elif eventType == EVENT.DELETE_EE_DATA:
110  self.dbi.delete(data, "id=%s" % data.id)
111 
112  return ret
113 
114 
115 
119  def getResponceEventType(self, eventType):
120  #variable for result
121  retEventType = EVENT.GENERAL_RESPONSE
122 
123  if eventType == EVENT.NEW_TASK:
124  retEventType = EVENT.NEW_TASK_RESPONSE
125  elif eventType == EVENT.FETCH_TASK_DATA:
126  retEventType = EVENT.FETCH_TASK_DATA_RESPONSE
127  elif eventType == EVENT.UPDATE_TASK:
128  retEventType = EVENT.UPDATE_TASK_RESPONSE
129  elif eventType == EVENT.DELETE_TASK_DATA:
130  retEventType = EVENT.DELETE_TASK_DATA_RESPONSE
131  elif eventType == EVENT.INSERT_EE_DATA:
132  retEventType = EVENT.INSERT_EE_DATA_RESPONSE
133  elif eventType == EVENT.FETCH_EE_DATA:
134  retEventType = EVENT.FETCH_EE_DATA_RESPONSE
135  elif eventType == EVENT.DELETE_EE_DATA:
136  retEventType = EVENT.DELETE_EE_DATA_RESPONSE
137 
138  return retEventType
139 
140 
141 
143  def eventProcessing(self, event):
144  serializeStr = pickle.dumps(event.eventObj)
145  if event.eventType == EVENT.NEW_TASK or \
146  event.eventType == EVENT.FETCH_TASK_DATA or \
147  event.eventType == EVENT.UPDATE_TASK or \
148  event.eventType == EVENT.DELETE_TASK_DATA:
149  data = TasksDataTable()
150  else:
151  data = EEResponsesTable() # pylint: disable=R0204
152  data.id = event.eventObj.id
153  data.data = serializeStr
154  dbiRet = None
155  retEventType = None
156  try:
157  retEventType = self.getResponceEventType(event.eventType)
158  dbiRet = GeneralResponse()
159  res = self.dbiProcessing(data, event.eventType)
160  if res is not None and len(res) > 0 and res[0] != None and hasattr(res[0], 'data') and res[0].data != None:
161  dbiRet = pickle.loads(str(res[0].data))
162 
163  except DBIErr as err:
164  logger.error("DB error: %s", str(err))
165  dbiRet.errorCode = err.errCode
166  dbiRet.errorMessage = "Some DB error in TasksDataManager.eventProcessing [" + str(err) + "]"
167  except Exception, err:
168  logger.error("Error: %s", str(err))
169  dbiRet.errorMessage = "Some error in TasksDataManager.eventProcessing [" + str(err) + "]"
170 
171  retEvent = self.eventBuilder.build(retEventType, dbiRet)
172  return retEvent
173 
174 
175 
177  def badEventType(self, msg, event):
178  errorStr = msg + str(event.eventType)
179  logger.error(LogFormatterEvent(event, [], errorStr))
180 # raise Exception(errorStr)
181 
182 
183 
185  def onNewTask(self, event):
186  logger.debug(LogFormatterEvent(event, [], ">>> TasksDataManager [NEW_TASK_REQUEST] Handler start"))
187  if event.eventType != EVENT.NEW_TASK:
188  self.badEventType(">>> Wrong Event type [NEW_TASK_REQUEST] != ", event)
189  self.reply(event, self.eventProcessing(event))
190  logger.debug(LogFormatterEvent(event, [], ">>> TasksDataManager [NEW_TASK_REQUEST] Handler finish"))
191 
192 
193  def onFetchTask(self, event):
194  logger.debug(LogFormatterEvent(event, [], ">>> TasksDataManager [FETCH_TASK_REQUEST] Handler start"))
195  if event.eventType != EVENT.FETCH_TASK_DATA:
196  self.badEventType(">>> Wrong Event type [FETCH_TASK_REQUEST] != ", event)
197  self.reply(event, self.eventProcessing(event))
198  logger.debug(LogFormatterEvent(event, [], ">>> TasksDataManager [FETCH_TASK_REQUEST] Handler finish"))
199 
200 
201  def onUpdateTask(self, event):
202  logger.debug(LogFormatterEvent(event, [], ">>> TasksDataManager [UPDATE_TASK_REQUEST] Handler start"))
203  if event.eventType != EVENT.UPDATE_TASK:
204  self.badEventType(">>> Wrong Event type [UPDATE_TASK_REQUEST] != ", event)
205  self.reply(event, self.eventProcessing(event))
206  logger.debug(LogFormatterEvent(event, [], ">>> TasksDataManager [UPDATE_TASK_REQUEST] Handler finish"))
207 
208 
209  def onDeleteTask(self, event):
210  logger.debug(LogFormatterEvent(event, [], ">>> TasksDataManager [DELETE_TASK_DATA_REQUEST] Handler start"))
211  if event.eventType != EVENT.DELETE_TASK_DATA:
212  self.badEventType(">>> Wrong Event type [DELETE_TASK_DATA_REQUEST] != ", event)
213  self.reply(event, self.eventProcessing(event))
214  logger.debug(LogFormatterEvent(event, [], ">>> TasksDataManager [DELETE_TASK_DATA_REQUEST] Handler finish"))
215 
216 
217  def onInsertEEResponse(self, event):
218  logger.debug(LogFormatterEvent(event, [], ">>> TasksDataManager [INSERT_EE_REQUEST] Handler start"))
219  if event.eventType != EVENT.INSERT_EE_DATA:
220  self.badEventType(">>> Wrong Event type [INSERT_EE_REQUEST] != ", event)
221  self.reply(event, self.eventProcessing(event))
222  logger.debug(LogFormatterEvent(event, [], ">>> TasksDataManager [INSERT_EE_REQUEST] Handler finish"))
223 
224 
225  def onFetchEEResponse(self, event):
226  logger.debug(LogFormatterEvent(event, [], ">>> TasksDataManager [FETCH_EE_REQUEST] Handler start"))
227  if event.eventType != EVENT.FETCH_EE_DATA:
228  self.badEventType(">>> Wrong Event type [FETCH_EE_REQUEST] != ", event)
229  self.reply(event, self.eventProcessing(event))
230  logger.debug(LogFormatterEvent(event, [], ">>> TasksDataManager [FETCH_EE_REQUEST] Handler finish"))
231 
232 
233  def onDeleteEEResponse(self, event):
234  logger.debug(LogFormatterEvent(event, [], ">>> TasksDataManager [DELETE_EE_REQUEST] Handler start"))
235  if event.eventType != EVENT.DELETE_EE_DATA:
236  self.badEventType(">>> Wrong Event type [DELETE_EE_REQUEST] != ", event)
237  self.reply(event, self.eventProcessing(event))
238  logger.debug(LogFormatterEvent(event, [], ">>> TasksDataManager [DELETE_EE_REQUEST] Handler finish"))
def reply(self, event, reply_event)
wrapper for sending event in reply for event
def badEventType(self, msg, event)
eventProcessing method Method contains main error processing for incoming events
def eventProcessing(self, event)
eventProcessing method Method contains main processing with incoming events
Log formatter event, defines the object to format message string.
Definition: LogFormatter.py:16
GeneralResponse event object, represents general state response for multipurpose usage.
def __init__(self, config, connectionBuilder=None)
constructor initialise all class variable and recieve config as param
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
def connectionInit(self)
connectionInit method initializes internal inproc connection
def addConnection(self, name, connection)
This is app base class for management server connection end-points and parallel transport messages pr...
def onNewTask(self, event)
Callbacks methods Event callbacks.
Definition: dbi.py:1
Class hides routines of bulding connection objects.
def dbiProcessing(self, data, eventType)
dbiProcessing method Method contains main processing with database API
Class contents TasksDataManager module implementation.
def getResponceEventType(self, eventType)
Get responce event type method.
def dbInit(self)
dbInit method initializes internal database API