HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
TasksManager.py
Go to the documentation of this file.
1 '''
2 @package: dc
3 @author igor
4 @link: http://hierarchical-cluster-engine.com/
5 @copyright: Copyright © 2013-2014 IOIX Ukraine
6 @license: http://hierarchical-cluster-engine.com/license/
7 @since: 0.1
8 '''
9 
10 import ConfigParser
11 from contextlib import closing
12 import MySQLdb.cursors
13 import MySQLdb as mdb
14 import dbi.EventObjects as dbi_event
15 import dc.EventObjects as dc_event
16 from dc.Constants import EVENT_TYPES, DRCESyncTasksCover
17 import dc_db.Constants as Constants
18 from dc_db.AttrSetTask import AttrSetTask
19 from dc_db.AttrFetchTask import AttrFetchTask
20 from dc_db.AttrUpdateTask import AttrUpdateTask
21 from dc_db.AttrDeleteTask import AttrDeleteTask
22 from dc_db.ProxyDeleteTask import ProxyDeleteTask
23 from dc_db.ProxyFindTask import ProxyFindTask
24 from dc_db.ProxyNewTask import ProxyNewTask
25 from dc_db.ProxyStatusTask import ProxyStatusTask
26 from dc_db.ProxyUpdateTask import ProxyUpdateTask
27 from dc_db.SiteTask import SiteTask
28 from dc_db.SiteCleanUpTask import SiteCleanUpTask
29 from dc_db.SiteDeleteTask import SiteDeleteTask
30 from dc_db.SiteUpdateTask import SiteUpdateTask
31 from dc_db.SiteStatusTask import SiteStatusTask
32 from dc_db.SiteFindTask import SiteFindTask
33 from dc_db.URLNewTask import URLNewTask
34 from dc_db.URLStatusTask import URLStatusTask
35 from dc_db.URLUpdateTask import URLUpdateTask
36 from dc_db.URLFetchTask import URLFetchTask
37 from dc_db.URLCleanupTask import URLCleanUpTask
38 from dc_db.URLDeleteTask import URLDeleteTask
39 from dc_db.URLContentTask import URLContentTask
40 from dc_db.SQLCustomTask import SQLCustomTask
41 from dc_db.URLPurgeTask import URLPurgeTask
42 from dc_db.FieldRecalculatorTask import FieldRecalculatorTask
43 from dc_db.URLVerifyTask import URLVerifyTask
44 from dc_db.URLAgeTask import URLAgeTask
45 from dc_db.DBDataTask import DBDataTask
46 from dc_db.URLPutTask import URLPutTask
47 from dc_db.URLHistoryTask import URLHistoryTask
48 from dc_db.URLStatsTask import URLStatsTask
49 from dc_db.StatisticLogManager import StatisticLogManager
50 from app.Utils import ExceptionLog
51 import app.Utils as Utils # pylint: disable=F0401
52 
53 logger = Utils.MPLogger().getLogger()
54 
55 # #dict to check event
56 eventCheckDict = dict({EVENT_TYPES.SITE_NEW: dc_event.Site,
57  EVENT_TYPES.SITE_UPDATE: dc_event.SiteUpdate,
58  EVENT_TYPES.SITE_STATUS: dc_event.SiteStatus,
59  EVENT_TYPES.SITE_DELETE: (dc_event.SiteDelete, list),
60  EVENT_TYPES.SITE_CLEANUP: (dc_event.SiteCleanup, list),
61  EVENT_TYPES.SITE_FIND: dc_event.SiteFind,
62  EVENT_TYPES.URL_NEW: list,
63  EVENT_TYPES.URL_UPDATE: list,
64  EVENT_TYPES.URL_STATUS: list,
65  EVENT_TYPES.URL_DELETE: list,
66  EVENT_TYPES.URL_FETCH: list,
67  EVENT_TYPES.URL_CLEANUP: list,
68  EVENT_TYPES.URL_CONTENT: list,
69  EVENT_TYPES.SQL_CUSTOM: dbi_event.CustomRequest,
70  EVENT_TYPES.URL_PURGE: list,
71  EVENT_TYPES.FIELD_RECALCULATE: list,
72  EVENT_TYPES.URL_VERIFY: list,
73  EVENT_TYPES.URL_AGE: list,
74  EVENT_TYPES.URL_PUT: list,
75  EVENT_TYPES.URL_HISTORY: list,
76  EVENT_TYPES.URL_STATS: list,
77  EVENT_TYPES.PROXY_NEW: list,
78  EVENT_TYPES.PROXY_UPDATE: list,
79  EVENT_TYPES.PROXY_DELETE: list,
80  EVENT_TYPES.PROXY_STATUS: list,
81  EVENT_TYPES.PROXY_FIND: list,
82  EVENT_TYPES.ATTR_SET: list,
83  EVENT_TYPES.ATTR_UPDATE: list,
84  EVENT_TYPES.ATTR_DELETE: list,
85  EVENT_TYPES.ATTR_FETCH: list})
86 
87 # #sql query which checks existence of a table
88 CHECK_TABLE_SQL_TEMPLATE = " SELECT COUNT(*) FROM `sites` WHERE Id = '%s'"
89 
90 class TasksManager(object):
91  # Configuration settings options names
92  DB_HOST = "db_host"
93  DB_PORT = "db_port"
94  DB_USER = "db_user"
95  DB_PWD = "db_pwd"
96  PRIMARY_DB = "primary_db"
97  SECONDARY_DB = "secondary_db"
98  THIRD_DB = "third_db"
99  FOURTH_DB = "fourth_db"
100  FIFTH_DB = "fifth_db"
101  STAT_DB = "stat_db"
102  LOG_DB = "log_db"
103  ATT_DB = "att_db"
104  STAT_DOMAINS_DB = "stat_domains_db"
105  MUTEX_LOCK_TTL = "mutexLockTTL"
106  SITE_TEMPLATE = "dc_site_template"
107  KEY_VALUE_STORAGE_DIR = "key_value_storage_dir"
108  KEY_VALUE_DEFAULT_FILE = "key_value_default_file"
109  RAW_DATA_DIR = "raw_data_dir"
110  CONTENT_STORAGE_TYPE = "content_storage_type"
111  CONTENT_TEMPLATE = "dc_content_template"
112  STAT_TEMPLATE = "dc_statistics_freq_template"
113  LOG_TEMPLATE = "dc_statistics_log_template"
114  ATTR_TEMPLATE = "dc_attribute_template"
115  SITE_PROP_NAME_FREQ = "stats_freq_enabled"
116  SITE_PROP_NAME_LOG = "stats_log_enabled"
117  SQL_OPERATION_LIST = ["update", "insert", "delete", "drop"]
118 
119 
120  # #constructor
121  #
122  # @param configParser config parser object
123  def __init__(self, configParser, dbTaskMode=dc_event.Batch.DB_MODE_RW):
124  self.dbTaskMode = dbTaskMode
125  className = self.__class__.__name__
126  # Get configuration settings
127  dbHost = configParser.get(className, self.DB_HOST)
128  dbPort = int(configParser.get(className, self.DB_PORT))
129  dbUser = configParser.get(className, self.DB_USER)
130  dbPWD = configParser.get(className, self.DB_PWD)
131  dbPrimary = configParser.get(className, self.PRIMARY_DB)
132  dbSecondary = configParser.get(className, self.SECONDARY_DB)
133  dbThird = configParser.get(className, self.THIRD_DB)
134  dbFourth = configParser.get(className, self.FOURTH_DB)
135  dbFifth = configParser.get(className, self.FIFTH_DB)
136  dbStat = configParser.get(className, self.STAT_DB)
137  dbLog = configParser.get(className, self.LOG_DB)
138  dbAtt = configParser.get(className, self.ATT_DB)
139  dbStatDomains = configParser.get(className, self.STAT_DOMAINS_DB)
140 
141  try:
142  self.mutexLockTTL = int(configParser.get(className, self.MUTEX_LOCK_TTL))
143  except Exception:
144  self.mutexLockTTL = Constants.DEFAULT_LOCK_TTL
145  self.contentStorageType = int(configParser.get(className, self.CONTENT_STORAGE_TYPE))
146  self.dbConnections = {}
147  self.dbNames = {}
148 
149  self.dcSiteTemplate = configParser.get(className, self.SITE_TEMPLATE)
150  self.dcStatTemplate = configParser.get(className, self.STAT_TEMPLATE)
151  self.dcLogTemplate = configParser.get(className, self.LOG_TEMPLATE)
152  self.dcAttrTemplate = configParser.get(className, self.ATTR_TEMPLATE)
153  self.keyValueStorageDir = configParser.get(className, self.KEY_VALUE_STORAGE_DIR)
154  self.rawDataDir = configParser.get(className, self.RAW_DATA_DIR)
155  self.keyValueDefaultFile = configParser.get(className, self.KEY_VALUE_DEFAULT_FILE)
156  self.dcContentTemplate = configParser.get(className, self.CONTENT_TEMPLATE)
157  self.dbConnections[Constants.PRIMARY_DB_ID] = mdb.connect(dbHost, dbUser, dbPWD, dbPrimary, dbPort, use_unicode=True, charset='utf8')
158  self.dbNames[dbPrimary] = Constants.PRIMARY_DB_ID
159  self.dbConnections[Constants.SECONDARY_DB_ID] = mdb.connect(dbHost, dbUser, dbPWD, dbSecondary, dbPort, use_unicode=True, charset='utf8')
160  self.dbNames[dbSecondary] = Constants.SECONDARY_DB_ID
161  if dbThird is not None and dbThird != '':
162  self.dbConnections[Constants.THIRD_DB_ID] = mdb.connect(dbHost, dbUser, dbPWD, dbThird, dbPort, use_unicode=True, charset='utf8')
163  self.dbNames[dbThird] = Constants.THIRD_DB_ID
164  else:
165  logger.debug(">>> THIRD_DB_ID empty")
166  if dbFourth is not None and dbFourth != '':
167  self.dbConnections[Constants.FOURTH_DB_ID] = mdb.connect(dbHost, dbUser, dbPWD, dbFourth, dbPort, use_unicode=True, charset='utf8')
168  self.dbNames[dbFourth] = Constants.FOURTH_DB_ID
169  else:
170  logger.debug(">>> FOURTH_DB_ID empty")
171  if dbFifth is not None and dbFifth != '':
172  self.dbConnections[Constants.FIFTH_DB_ID] = mdb.connect(dbHost, dbUser, dbPWD, dbFifth, dbPort, use_unicode=True, charset='utf8')
173  self.dbNames[dbFifth] = Constants.FIFTH_DB_ID
174  else:
175  logger.debug(">>> FIFTH_DB_ID empty")
176  self.dbConnections[Constants.STAT_DB_ID] = mdb.connect(dbHost, dbUser, dbPWD, dbStat, dbPort, use_unicode=True, charset='utf8')
177  self.dbNames[dbStat] = Constants.STAT_DB_ID
178  self.dbConnections[Constants.LOG_DB_ID] = mdb.connect(dbHost, dbUser, dbPWD, dbLog, dbPort, use_unicode=True, charset='utf8')
179  self.dbNames[dbLog] = Constants.LOG_DB_ID
180  self.dbConnections[Constants.LOG_DB_ID] = mdb.connect(dbHost, dbUser, dbPWD, dbLog, dbPort, use_unicode=True, charset='utf8')
181  self.dbNames[dbLog] = Constants.LOG_DB_ID
182  self.dbConnections[Constants.ATT_DB_ID] = mdb.connect(dbHost, dbUser, dbPWD, dbAtt, dbPort, use_unicode=True, charset='utf8')
183  self.dbNames[dbAtt] = Constants.ATT_DB_ID
184  self.dbConnections[Constants.STAT_DOMAINS_DB_ID] = mdb.connect(dbHost, dbUser, dbPWD, dbStatDomains, dbPort, use_unicode=True, charset='utf8')
185  self.dbNames[dbStatDomains] = Constants.STAT_DOMAINS_DB_ID
186 
187  self.SQLErrorCode = Constants.EXIT_CODE_OK
188  self.SQLErrorString = ""
190  self.keyValueStorageDir, self.rawDataDir)
191  try:
192  tmpVal = int(configParser.get(className, self.SITE_PROP_NAME_FREQ))
193  StatisticLogManager.GLOBAL_FREQ_ENABLED = bool(tmpVal)
194  except ConfigParser.NoOptionError:
195  pass
196  try:
197  tmpVal = int(configParser.get(className, self.SITE_PROP_NAME_LOG))
198  StatisticLogManager.GLOBAL_LOG_ENABLED = bool(tmpVal)
199  except ConfigParser.NoOptionError:
200  pass
201 
202 
203  # #destructor
204  #
205  def __del__(self):
206  for dbConnect in self.dbConnections.values():
207  if dbConnect is not None:
208  dbConnect.close()
209 
210 
211  # #process input event
212  #
213  # @param drceSyncTasksCover instance of DRCESyncTasksCover
214  # @return generalResponse instance of GeneralResponse object
215  def process(self, drceSyncTasksCover):
216  logger.info("Request eventType: %s, eventObject type: %s",
217  str(drceSyncTasksCover.eventType), str(type(drceSyncTasksCover.eventObject)))
218  if isinstance(drceSyncTasksCover.eventObject, list):
219  if len(drceSyncTasksCover.eventObject) > 0:
220  itemType = str(drceSyncTasksCover.eventObject[0])
221  else:
222  itemType = ""
223  logger.info("Request eventObject items: %s, item type: %s", str(len(drceSyncTasksCover.eventObject)), itemType)
224  # if isinstance(drceSyncTasksCover.eventObject, dc_event.URLFetch):
225  # logger.debug('Event object: %s', varDump(drceSyncTasksCover.eventObject))
226 
227  self.checkInputData(drceSyncTasksCover)
228  responseObject = None
229  responseEvent = None
230  if drceSyncTasksCover.eventType == EVENT_TYPES.SITE_DELETE:
231  siteDeleteTask = SiteDeleteTask(self.keyValueStorageDir, self.rawDataDir, self.dBDataTask)
232  responseObject = siteDeleteTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
233  responseEvent = EVENT_TYPES.SITE_DELETE_RESPONSE
234  self.SQLErrorCode = Constants.EXIT_CODE_OK
235 
236  elif drceSyncTasksCover.eventType == EVENT_TYPES.SITE_NEW:
237  siteTask = SiteTask(self.dcSiteTemplate, self.keyValueDefaultFile, self.keyValueStorageDir, self.dBDataTask,
238  self.dcStatTemplate, self.dcLogTemplate, self.dcAttrTemplate, self)
239  responseObject = siteTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
240  responseEvent = EVENT_TYPES.SITE_NEW_RESPONSE
241 
242  elif drceSyncTasksCover.eventType == EVENT_TYPES.SITE_UPDATE:
243  siteUpdateTask = SiteUpdateTask(self.dcSiteTemplate, self.keyValueDefaultFile, self.keyValueStorageDir,
244  self.rawDataDir, self.dBDataTask, self.dcStatTemplate, self.dcLogTemplate,
245  self.dcAttrTemplate)
246  responseObject = siteUpdateTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
247  responseEvent = EVENT_TYPES.SITE_UPDATE_RESPONSE
248 
249  elif drceSyncTasksCover.eventType == EVENT_TYPES.SITE_CLEANUP:
250  siteCleanUpTask = SiteCleanUpTask(self.keyValueStorageDir, self.rawDataDir, self.dBDataTask)
251  responseObject = siteCleanUpTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
252  responseEvent = EVENT_TYPES.SITE_CLEANUP_RESPONSE
253  self.SQLErrorCode = Constants.EXIT_CODE_OK
254 
255  elif drceSyncTasksCover.eventType == EVENT_TYPES.SITE_STATUS:
256  siteStatusTask = SiteStatusTask()
257  responseObject = siteStatusTask.process(drceSyncTasksCover.eventObject, self.executeQuery) # pylint:disable=R0204
258  responseEvent = EVENT_TYPES.SITE_STATUS_RESPONSE
259 
260  # find site by root url and return list of siteId's with root urls
261  elif drceSyncTasksCover.eventType == EVENT_TYPES.SITE_FIND:
262  siteFindTask = SiteFindTask(self.dcSiteTemplate, self.keyValueDefaultFile, self.keyValueStorageDir,
263  self.dBDataTask, self.dcStatTemplate, self.dcLogTemplate, self.dcAttrTemplate)
264  responseObject = siteFindTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
265  responseEvent = EVENT_TYPES.SITE_FIND_RESPONSE
266 
267  elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_NEW:
268  urlNewTask = URLNewTask(self.keyValueStorageDir, self.rawDataDir, self.dBDataTask)
269  urlNewTask.siteTask = SiteTask(self.dcSiteTemplate, self.keyValueDefaultFile, self.keyValueStorageDir,
270  self.dBDataTask, self.dcStatTemplate, self.dcLogTemplate, self.dcAttrTemplate)
271  responseObject = urlNewTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
272  responseEvent = EVENT_TYPES.URL_NEW_RESPONSE
273 
274  elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_STATUS:
275  urlStatusTask = URLStatusTask()
276  responseObject = urlStatusTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
277  responseEvent = EVENT_TYPES.URL_STATUS_RESPONSE
278 
279  elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_UPDATE:
280  urlUpdateTask = URLUpdateTask(self.keyValueStorageDir, self.rawDataDir, self.dBDataTask)
281  responseObject = urlUpdateTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
282  responseEvent = EVENT_TYPES.URL_UPDATE_RESPONSE
283 
284  elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_FETCH:
285  urlFetchTask = URLFetchTask(self.keyValueStorageDir, self.rawDataDir, self.dBDataTask, self.dcSiteTemplate,
287  responseObject = urlFetchTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
288  responseEvent = EVENT_TYPES.URL_FETCH_RESPONSE
289 
290  elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_CONTENT:
291  urlContentTask = URLContentTask(self.keyValueStorageDir, self.rawDataDir, self.dBDataTask, self.dcSiteTemplate,
293  responseObject = urlContentTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
294  responseEvent = EVENT_TYPES.URL_CONTENT_RESPONSE
295 
296  elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_DELETE:
297  urlDeleteTask = URLDeleteTask(self.keyValueStorageDir, self.rawDataDir, self.dBDataTask)
298  responseObject = urlDeleteTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
299  responseEvent = EVENT_TYPES.URL_DELETE_RESPONSE
300 
301  elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_CLEANUP:
302  urlCleanUpTask = URLCleanUpTask(self.keyValueStorageDir, self.rawDataDir, self.dBDataTask)
303  responseObject = urlCleanUpTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
304  responseEvent = EVENT_TYPES.URL_CLEANUP_RESPONSE
305 
306  elif drceSyncTasksCover.eventType == EVENT_TYPES.SQL_CUSTOM:
307  sqlCustomTask = SQLCustomTask()
308  responseObject = sqlCustomTask.process(drceSyncTasksCover.eventObject, self.executeQuery, self.backDBResolve)
309  responseEvent = EVENT_TYPES.SQL_CUSTOM_RESPONSE
310 
311  elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_PURGE:
312  urlPurgeTask = URLPurgeTask(self.keyValueStorageDir, self.rawDataDir, self.dBDataTask)
313  responseObject = urlPurgeTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
314  responseEvent = EVENT_TYPES.URL_PURGE_RESPONSE
315 
316  elif drceSyncTasksCover.eventType == EVENT_TYPES.FIELD_RECALCULATE:
317  fieldRecalculatorTask = FieldRecalculatorTask()
318  responseObject = fieldRecalculatorTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
319  responseEvent = EVENT_TYPES.FIELD_RECALCULATE_RESPONSE
320 
321  elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_VERIFY:
322  urlVarifyTask = URLVerifyTask()
323  responseObject = urlVarifyTask.process(drceSyncTasksCover.eventObject, self.executeQuery, self.backDBResolve)
324  responseEvent = EVENT_TYPES.URL_VERIFY_RESPONSE
325 
326  elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_AGE:
327  urlAgeTask = URLAgeTask(self.keyValueStorageDir, self.rawDataDir, self.backDBResolve)
328  responseObject = urlAgeTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
329  responseEvent = EVENT_TYPES.URL_AGE_RESPONSE
330 
331  elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_PUT:
332  urlPutTask = URLPutTask(self.keyValueStorageDir, self.rawDataDir, self.dBDataTask)
333  responseObject = urlPutTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
334  responseEvent = EVENT_TYPES.URL_PUT_RESPONSE
335 
336  elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_HISTORY:
337  urlHistoryTask = URLHistoryTask(self.keyValueStorageDir, self.rawDataDir, self.dBDataTask)
338  responseObject = urlHistoryTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
339  responseEvent = EVENT_TYPES.URL_HISTORY_RESPONSE
340 
341  elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_STATS:
342  urlStatsTask = URLStatsTask(self.keyValueStorageDir, self.rawDataDir, self.dBDataTask)
343  responseObject = urlStatsTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
344  responseEvent = EVENT_TYPES.URL_STATS_RESPONSE
345 
346  elif drceSyncTasksCover.eventType == EVENT_TYPES.PROXY_NEW:
347  proxyNewTask = ProxyNewTask()
348  responseObject = proxyNewTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
349  responseEvent = EVENT_TYPES.PROXY_NEW_RESPONSE
350 
351  elif drceSyncTasksCover.eventType == EVENT_TYPES.PROXY_UPDATE:
352  proxyUpdateTask = ProxyUpdateTask()
353  responseObject = proxyUpdateTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
354  responseEvent = EVENT_TYPES.PROXY_UPDATE_RESPONSE
355 
356  elif drceSyncTasksCover.eventType == EVENT_TYPES.PROXY_DELETE:
357  proxyDeleteTask = ProxyDeleteTask()
358  responseObject = proxyDeleteTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
359  responseEvent = EVENT_TYPES.PROXY_DELETE_RESPONSE
360 
361  elif drceSyncTasksCover.eventType == EVENT_TYPES.PROXY_STATUS:
362  proxyStatusTask = ProxyStatusTask()
363  responseObject = proxyStatusTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
364  responseEvent = EVENT_TYPES.PROXY_STATUS_RESPONSE
365 
366  elif drceSyncTasksCover.eventType == EVENT_TYPES.PROXY_FIND:
367  proxyFindTask = ProxyFindTask()
368  responseObject = proxyFindTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
369  responseEvent = EVENT_TYPES.PROXY_FIND_RESPONSE
370 
371  elif drceSyncTasksCover.eventType == EVENT_TYPES.ATTR_SET:
372  attrSetTask = AttrSetTask()
373  responseObject = attrSetTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
374  responseEvent = EVENT_TYPES.ATTR_SET_RESPONSE
375 
376  elif drceSyncTasksCover.eventType == EVENT_TYPES.ATTR_UPDATE:
377  attrUpdateTask = AttrUpdateTask()
378  responseObject = attrUpdateTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
379  responseEvent = EVENT_TYPES.ATTR_UPDATE_RESPONSE
380 
381  elif drceSyncTasksCover.eventType == EVENT_TYPES.ATTR_DELETE:
382  attrDeleteTask = AttrDeleteTask()
383  responseObject = attrDeleteTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
384  responseEvent = EVENT_TYPES.ATTR_DELETE_RESPONSE
385 
386  elif drceSyncTasksCover.eventType == EVENT_TYPES.ATTR_FETCH:
387  attrFetchTask = AttrFetchTask()
388  responseObject = attrFetchTask.process(drceSyncTasksCover.eventObject, self.executeQuery)
389  responseEvent = EVENT_TYPES.ATTR_FETCH_RESPONSE
390 
391  return self.createDRCESyncTasksCover(responseEvent, responseObject)
392 
393 
394  # #executeQuery common entry point for SQL execution
395  #
396  # @param query query for execution
397  # @param dbConnectionName MySQL connection Id
398  # @executeType executeType - return set type
399  # @returns fetching data or None
400  def executeQuery(self, query, dbConnectionName, executeType=Constants.EXEC_INDEX, SQLErrorClear=False):
401  ret = None
402  if executeType == Constants.EXEC_INDEX:
403  ret = self.executeQueryByIndex(query, dbConnectionName, SQLErrorClear)
404  elif executeType == Constants.EXEC_NAME:
405  ret = self.executeQueryByName(query, dbConnectionName, SQLErrorClear)
406  return ret
407 
408 
409  # #method performs common sql error handling
410  #
411  # @param err SQL exception class instance
412  # @param dbConnectionName incoming dbConnection name
413  # @param SQLErrorClear mean set errorCode or not
414  def commonQueryErrorHandler(self, err, dbConnection, SQLErrorClear=False):
415  dbConnection.rollback()
416  self.SQLErrorString = "%s %s" % (err.args[0], err.args[1])
417  if not SQLErrorClear:
418  ExceptionLog.handler(logger, err, self.SQLErrorString)
419  self.SQLErrorCode = Constants.EXIT_CODE_MYSQL_ERROR
420  else:
421  logger.debug(self.SQLErrorString)
422 
423 
424  # #helper check sql query by allowed prefix in ReadOnly db-task mode
425  #
426  # @param query query for execution
427  # @return bool value
428  def checkQueryInReadOnly(self, query):
429  ret = True
430  if self.dbTaskMode & dc_event.Batch.DB_MODE_W == 0:
431  low_query = query.lower()
432  for elem in self.SQL_OPERATION_LIST:
433  if low_query.startswith(elem):
434  logger.debug(">>> QUERY = " + query)
435  logger.debug(">>> NOT SUPPORT in read only mode")
436  ret = False
437  break
438  return ret
439 
440 
441  # #helper function for correct query execution
442  #
443  # @param query query for execution
444  # @param dbConnectionName incoming dbConnection name
445  # @param SQLErrorClear mean set errorCode or not
446  def executeQueryByIndex(self, query, dbConnectionName, SQLErrorClear=False):
447  ret = None
448  if self.checkQueryInReadOnly(query):
449  dbConnection = None
450  if dbConnectionName in self.dbConnections:
451  dbConnection = self.dbConnections[dbConnectionName]
452  if dbConnection:
453  try:
454  with closing(dbConnection.cursor()) as cursor:
455  # logger.debug(query)
456  cursor.execute(query)
457  dbConnection.commit()
458  ret = cursor.fetchall()
459  except mdb.Error as err: # pylint: disable=E1101
460  self.commonQueryErrorHandler(err, dbConnection, SQLErrorClear)
461  else:
462  logger.debug(">>> dbConnection Not Found = " + str(dbConnectionName))
463  return ret
464 
465 
466  # #helper function for correct query execution
467  #
468  # @param query query for execution
469  # @param dbConnectionName incoming dbConnection name
470  # @param SQLErrorClear mean set errorCode or not
471  def executeQueryByName(self, query, dbConnectionName, SQLErrorClear=False):
472  ret = None
473  if self.checkQueryInReadOnly(query):
474  dbConnection = None
475  if dbConnectionName in self.dbConnections:
476  dbConnection = self.dbConnections[dbConnectionName]
477  if dbConnection:
478  try:
479  with closing(dbConnection.cursor(MySQLdb.cursors.DictCursor)) as cursor:
480  # logger.debug(query)
481  cursor.execute(query)
482  dbConnection.commit()
483  ret = cursor.fetchall()
484  except mdb.Error as err: # pylint: disable=E1101
485  self.commonQueryErrorHandler(err, dbConnection, SQLErrorClear)
486  else:
487  logger.debug(">>> dbConnection Not Found = " + str(dbConnectionName))
488  return ret
489 
490 
491  # #check conformity data in input DRCESyncTasksCover object
492  #
493  # @param drceSyncTasksCover instance of DRCESyncTasksCover
494  def checkInputData(self, drceSyncTasksCover):
495  for inputType in eventCheckDict:
496  if drceSyncTasksCover.eventType is inputType:
497  if isinstance(eventCheckDict[inputType], tuple):
498  for localType in eventCheckDict[inputType]:
499  if isinstance(drceSyncTasksCover.eventObject, localType):
500  return True
501  elif isinstance(drceSyncTasksCover.eventObject, eventCheckDict[inputType]):
502  return True
503  raise Exception("Incorrect types of input data")
504 
505 
506  # #helper function to build response as DRCESyncTasksCover object
507  #
508  # @param eventType type of response event
509  # @param eventObject instance of returning object
510  def createDRCESyncTasksCover(self, eventType, eventObject):
511  return DRCESyncTasksCover(eventType, eventObject)
512 
513 
514  # #gets dbName and returns internar db index
515  #
516  # @param dbName db name
517  def backDBResolve(self, dbName):
518  ret = None
519  if dbName in self.dbNames:
520  ret = self.dbNames[dbName]
521  return ret
522 
def checkInputData(self, drceSyncTasksCover)
def executeQueryByName(self, query, dbConnectionName, SQLErrorClear=False)
def __init__(self, configParser, dbTaskMode=dc_event.Batch.DB_MODE_RW)
def createDRCESyncTasksCover(self, eventType, eventObject)
def executeQueryByIndex(self, query, dbConnectionName, SQLErrorClear=False)
def executeQuery(self, query, dbConnectionName, executeType=Constants.EXEC_INDEX, SQLErrorClear=False)
def checkQueryInReadOnly(self, query)
def backDBResolve(self, dbName)
def commonQueryErrorHandler(self, err, dbConnection, SQLErrorClear=False)
def process(self, drceSyncTasksCover)