4 @link: http://hierarchical-cluster-engine.com/ 5 @copyright: Copyright © 2013-2014 IOIX Ukraine 6 @license: http://hierarchical-cluster-engine.com/license/ 11 from contextlib
import closing
12 import MySQLdb.cursors
56 eventCheckDict = dict({EVENT_TYPES.SITE_NEW: dc_event.Site,
57 EVENT_TYPES.SITE_UPDATE: dc_event.SiteUpdate,
58 EVENT_TYPES.SITE_STATUS: dc_event.SiteStatus,
59 EVENT_TYPES.SITE_DELETE: (dc_event.SiteDelete, list),
60 EVENT_TYPES.SITE_CLEANUP: (dc_event.SiteCleanup, list),
61 EVENT_TYPES.SITE_FIND: dc_event.SiteFind,
62 EVENT_TYPES.URL_NEW: list,
63 EVENT_TYPES.URL_UPDATE: list,
64 EVENT_TYPES.URL_STATUS: list,
65 EVENT_TYPES.URL_DELETE: list,
66 EVENT_TYPES.URL_FETCH: list,
67 EVENT_TYPES.URL_CLEANUP: list,
68 EVENT_TYPES.URL_CONTENT: list,
69 EVENT_TYPES.SQL_CUSTOM: dbi_event.CustomRequest,
70 EVENT_TYPES.URL_PURGE: list,
71 EVENT_TYPES.FIELD_RECALCULATE: list,
72 EVENT_TYPES.URL_VERIFY: list,
73 EVENT_TYPES.URL_AGE: list,
74 EVENT_TYPES.URL_PUT: list,
75 EVENT_TYPES.URL_HISTORY: list,
76 EVENT_TYPES.URL_STATS: list,
77 EVENT_TYPES.PROXY_NEW: list,
78 EVENT_TYPES.PROXY_UPDATE: list,
79 EVENT_TYPES.PROXY_DELETE: list,
80 EVENT_TYPES.PROXY_STATUS: list,
81 EVENT_TYPES.PROXY_FIND: list,
82 EVENT_TYPES.ATTR_SET: list,
83 EVENT_TYPES.ATTR_UPDATE: list,
84 EVENT_TYPES.ATTR_DELETE: list,
85 EVENT_TYPES.ATTR_FETCH: list})
88 CHECK_TABLE_SQL_TEMPLATE =
" SELECT COUNT(*) FROM `sites` WHERE Id = '%s'" 96 PRIMARY_DB =
"primary_db" 97 SECONDARY_DB =
"secondary_db" 99 FOURTH_DB =
"fourth_db" 100 FIFTH_DB =
"fifth_db" 104 STAT_DOMAINS_DB =
"stat_domains_db" 105 MUTEX_LOCK_TTL =
"mutexLockTTL" 106 SITE_TEMPLATE =
"dc_site_template" 107 KEY_VALUE_STORAGE_DIR =
"key_value_storage_dir" 108 KEY_VALUE_DEFAULT_FILE =
"key_value_default_file" 109 RAW_DATA_DIR =
"raw_data_dir" 110 CONTENT_STORAGE_TYPE =
"content_storage_type" 111 CONTENT_TEMPLATE =
"dc_content_template" 112 STAT_TEMPLATE =
"dc_statistics_freq_template" 113 LOG_TEMPLATE =
"dc_statistics_log_template" 114 ATTR_TEMPLATE =
"dc_attribute_template" 115 SITE_PROP_NAME_FREQ =
"stats_freq_enabled" 116 SITE_PROP_NAME_LOG =
"stats_log_enabled" 117 SQL_OPERATION_LIST = [
"update",
"insert",
"delete",
"drop"]
123 def __init__(self, configParser, dbTaskMode=dc_event.Batch.DB_MODE_RW):
125 className = self.__class__.__name__
127 dbHost = configParser.get(className, self.
DB_HOST)
128 dbPort = int(configParser.get(className, self.
DB_PORT))
129 dbUser = configParser.get(className, self.
DB_USER)
130 dbPWD = configParser.get(className, self.
DB_PWD)
131 dbPrimary = configParser.get(className, self.
PRIMARY_DB)
132 dbSecondary = configParser.get(className, self.
SECONDARY_DB)
133 dbThird = configParser.get(className, self.
THIRD_DB)
134 dbFourth = configParser.get(className, self.
FOURTH_DB)
135 dbFifth = configParser.get(className, self.
FIFTH_DB)
136 dbStat = configParser.get(className, self.
STAT_DB)
137 dbLog = configParser.get(className, self.
LOG_DB)
138 dbAtt = configParser.get(className, self.
ATT_DB)
157 self.
dbConnections[Constants.PRIMARY_DB_ID] = mdb.connect(dbHost, dbUser, dbPWD, dbPrimary, dbPort, use_unicode=
True, charset=
'utf8')
158 self.
dbNames[dbPrimary] = Constants.PRIMARY_DB_ID
159 self.
dbConnections[Constants.SECONDARY_DB_ID] = mdb.connect(dbHost, dbUser, dbPWD, dbSecondary, dbPort, use_unicode=
True, charset=
'utf8')
160 self.
dbNames[dbSecondary] = Constants.SECONDARY_DB_ID
161 if dbThird
is not None and dbThird !=
'':
162 self.
dbConnections[Constants.THIRD_DB_ID] = mdb.connect(dbHost, dbUser, dbPWD, dbThird, dbPort, use_unicode=
True, charset=
'utf8')
163 self.
dbNames[dbThird] = Constants.THIRD_DB_ID
165 logger.debug(
">>> THIRD_DB_ID empty")
166 if dbFourth
is not None and dbFourth !=
'':
167 self.
dbConnections[Constants.FOURTH_DB_ID] = mdb.connect(dbHost, dbUser, dbPWD, dbFourth, dbPort, use_unicode=
True, charset=
'utf8')
168 self.
dbNames[dbFourth] = Constants.FOURTH_DB_ID
170 logger.debug(
">>> FOURTH_DB_ID empty")
171 if dbFifth
is not None and dbFifth !=
'':
172 self.
dbConnections[Constants.FIFTH_DB_ID] = mdb.connect(dbHost, dbUser, dbPWD, dbFifth, dbPort, use_unicode=
True, charset=
'utf8')
173 self.
dbNames[dbFifth] = Constants.FIFTH_DB_ID
175 logger.debug(
">>> FIFTH_DB_ID empty")
176 self.
dbConnections[Constants.STAT_DB_ID] = mdb.connect(dbHost, dbUser, dbPWD, dbStat, dbPort, use_unicode=
True, charset=
'utf8')
177 self.
dbNames[dbStat] = Constants.STAT_DB_ID
178 self.
dbConnections[Constants.LOG_DB_ID] = mdb.connect(dbHost, dbUser, dbPWD, dbLog, dbPort, use_unicode=
True, charset=
'utf8')
179 self.
dbNames[dbLog] = Constants.LOG_DB_ID
180 self.
dbConnections[Constants.LOG_DB_ID] = mdb.connect(dbHost, dbUser, dbPWD, dbLog, dbPort, use_unicode=
True, charset=
'utf8')
181 self.
dbNames[dbLog] = Constants.LOG_DB_ID
182 self.
dbConnections[Constants.ATT_DB_ID] = mdb.connect(dbHost, dbUser, dbPWD, dbAtt, dbPort, use_unicode=
True, charset=
'utf8')
183 self.
dbNames[dbAtt] = Constants.ATT_DB_ID
184 self.
dbConnections[Constants.STAT_DOMAINS_DB_ID] = mdb.connect(dbHost, dbUser, dbPWD, dbStatDomains, dbPort, use_unicode=
True, charset=
'utf8')
185 self.
dbNames[dbStatDomains] = Constants.STAT_DOMAINS_DB_ID
193 StatisticLogManager.GLOBAL_FREQ_ENABLED = bool(tmpVal)
194 except ConfigParser.NoOptionError:
198 StatisticLogManager.GLOBAL_LOG_ENABLED = bool(tmpVal)
199 except ConfigParser.NoOptionError:
207 if dbConnect
is not None:
216 logger.info(
"Request eventType: %s, eventObject type: %s",
217 str(drceSyncTasksCover.eventType), str(
type(drceSyncTasksCover.eventObject)))
218 if isinstance(drceSyncTasksCover.eventObject, list):
219 if len(drceSyncTasksCover.eventObject) > 0:
220 itemType = str(drceSyncTasksCover.eventObject[0])
223 logger.info(
"Request eventObject items: %s, item type: %s", str(len(drceSyncTasksCover.eventObject)), itemType)
228 responseObject =
None 230 if drceSyncTasksCover.eventType == EVENT_TYPES.SITE_DELETE:
232 responseObject = siteDeleteTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
233 responseEvent = EVENT_TYPES.SITE_DELETE_RESPONSE
236 elif drceSyncTasksCover.eventType == EVENT_TYPES.SITE_NEW:
239 responseObject = siteTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
240 responseEvent = EVENT_TYPES.SITE_NEW_RESPONSE
242 elif drceSyncTasksCover.eventType == EVENT_TYPES.SITE_UPDATE:
246 responseObject = siteUpdateTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
247 responseEvent = EVENT_TYPES.SITE_UPDATE_RESPONSE
249 elif drceSyncTasksCover.eventType == EVENT_TYPES.SITE_CLEANUP:
251 responseObject = siteCleanUpTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
252 responseEvent = EVENT_TYPES.SITE_CLEANUP_RESPONSE
255 elif drceSyncTasksCover.eventType == EVENT_TYPES.SITE_STATUS:
257 responseObject = siteStatusTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
258 responseEvent = EVENT_TYPES.SITE_STATUS_RESPONSE
261 elif drceSyncTasksCover.eventType == EVENT_TYPES.SITE_FIND:
264 responseObject = siteFindTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
265 responseEvent = EVENT_TYPES.SITE_FIND_RESPONSE
267 elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_NEW:
271 responseObject = urlNewTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
272 responseEvent = EVENT_TYPES.URL_NEW_RESPONSE
274 elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_STATUS:
276 responseObject = urlStatusTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
277 responseEvent = EVENT_TYPES.URL_STATUS_RESPONSE
279 elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_UPDATE:
281 responseObject = urlUpdateTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
282 responseEvent = EVENT_TYPES.URL_UPDATE_RESPONSE
284 elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_FETCH:
287 responseObject = urlFetchTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
288 responseEvent = EVENT_TYPES.URL_FETCH_RESPONSE
290 elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_CONTENT:
293 responseObject = urlContentTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
294 responseEvent = EVENT_TYPES.URL_CONTENT_RESPONSE
296 elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_DELETE:
298 responseObject = urlDeleteTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
299 responseEvent = EVENT_TYPES.URL_DELETE_RESPONSE
301 elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_CLEANUP:
303 responseObject = urlCleanUpTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
304 responseEvent = EVENT_TYPES.URL_CLEANUP_RESPONSE
306 elif drceSyncTasksCover.eventType == EVENT_TYPES.SQL_CUSTOM:
309 responseEvent = EVENT_TYPES.SQL_CUSTOM_RESPONSE
311 elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_PURGE:
313 responseObject = urlPurgeTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
314 responseEvent = EVENT_TYPES.URL_PURGE_RESPONSE
316 elif drceSyncTasksCover.eventType == EVENT_TYPES.FIELD_RECALCULATE:
318 responseObject = fieldRecalculatorTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
319 responseEvent = EVENT_TYPES.FIELD_RECALCULATE_RESPONSE
321 elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_VERIFY:
324 responseEvent = EVENT_TYPES.URL_VERIFY_RESPONSE
326 elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_AGE:
328 responseObject = urlAgeTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
329 responseEvent = EVENT_TYPES.URL_AGE_RESPONSE
331 elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_PUT:
333 responseObject = urlPutTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
334 responseEvent = EVENT_TYPES.URL_PUT_RESPONSE
336 elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_HISTORY:
338 responseObject = urlHistoryTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
339 responseEvent = EVENT_TYPES.URL_HISTORY_RESPONSE
341 elif drceSyncTasksCover.eventType == EVENT_TYPES.URL_STATS:
343 responseObject = urlStatsTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
344 responseEvent = EVENT_TYPES.URL_STATS_RESPONSE
346 elif drceSyncTasksCover.eventType == EVENT_TYPES.PROXY_NEW:
348 responseObject = proxyNewTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
349 responseEvent = EVENT_TYPES.PROXY_NEW_RESPONSE
351 elif drceSyncTasksCover.eventType == EVENT_TYPES.PROXY_UPDATE:
353 responseObject = proxyUpdateTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
354 responseEvent = EVENT_TYPES.PROXY_UPDATE_RESPONSE
356 elif drceSyncTasksCover.eventType == EVENT_TYPES.PROXY_DELETE:
358 responseObject = proxyDeleteTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
359 responseEvent = EVENT_TYPES.PROXY_DELETE_RESPONSE
361 elif drceSyncTasksCover.eventType == EVENT_TYPES.PROXY_STATUS:
363 responseObject = proxyStatusTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
364 responseEvent = EVENT_TYPES.PROXY_STATUS_RESPONSE
366 elif drceSyncTasksCover.eventType == EVENT_TYPES.PROXY_FIND:
368 responseObject = proxyFindTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
369 responseEvent = EVENT_TYPES.PROXY_FIND_RESPONSE
371 elif drceSyncTasksCover.eventType == EVENT_TYPES.ATTR_SET:
373 responseObject = attrSetTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
374 responseEvent = EVENT_TYPES.ATTR_SET_RESPONSE
376 elif drceSyncTasksCover.eventType == EVENT_TYPES.ATTR_UPDATE:
378 responseObject = attrUpdateTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
379 responseEvent = EVENT_TYPES.ATTR_UPDATE_RESPONSE
381 elif drceSyncTasksCover.eventType == EVENT_TYPES.ATTR_DELETE:
383 responseObject = attrDeleteTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
384 responseEvent = EVENT_TYPES.ATTR_DELETE_RESPONSE
386 elif drceSyncTasksCover.eventType == EVENT_TYPES.ATTR_FETCH:
388 responseObject = attrFetchTask.process(drceSyncTasksCover.eventObject, self.
executeQuery)
389 responseEvent = EVENT_TYPES.ATTR_FETCH_RESPONSE
400 def executeQuery(self, query, dbConnectionName, executeType=Constants.EXEC_INDEX, SQLErrorClear=False):
402 if executeType == Constants.EXEC_INDEX:
404 elif executeType == Constants.EXEC_NAME:
415 dbConnection.rollback()
417 if not SQLErrorClear:
430 if self.
dbTaskMode & dc_event.Batch.DB_MODE_W == 0:
431 low_query = query.lower()
433 if low_query.startswith(elem):
434 logger.debug(
">>> QUERY = " + query)
435 logger.debug(
">>> NOT SUPPORT in read only mode")
454 with closing(dbConnection.cursor())
as cursor:
456 cursor.execute(query)
457 dbConnection.commit()
458 ret = cursor.fetchall()
459 except mdb.Error
as err:
462 logger.debug(
">>> dbConnection Not Found = " + str(dbConnectionName))
479 with closing(dbConnection.cursor(MySQLdb.cursors.DictCursor))
as cursor:
481 cursor.execute(query)
482 dbConnection.commit()
483 ret = cursor.fetchall()
484 except mdb.Error
as err:
487 logger.debug(
">>> dbConnection Not Found = " + str(dbConnectionName))
495 for inputType
in eventCheckDict:
496 if drceSyncTasksCover.eventType
is inputType:
497 if isinstance(eventCheckDict[inputType], tuple):
498 for localType
in eventCheckDict[inputType]:
499 if isinstance(drceSyncTasksCover.eventObject, localType):
501 elif isinstance(drceSyncTasksCover.eventObject, eventCheckDict[inputType]):
503 raise Exception(
"Incorrect types of input data")
def checkInputData(self, drceSyncTasksCover)
string KEY_VALUE_DEFAULT_FILE
def executeQueryByName(self, query, dbConnectionName, SQLErrorClear=False)
def __init__(self, configParser, dbTaskMode=dc_event.Batch.DB_MODE_RW)
string SITE_PROP_NAME_FREQ
def createDRCESyncTasksCover(self, eventType, eventObject)
string KEY_VALUE_STORAGE_DIR
def executeQueryByIndex(self, query, dbConnectionName, SQLErrorClear=False)
def executeQuery(self, query, dbConnectionName, executeType=Constants.EXEC_INDEX, SQLErrorClear=False)
def checkQueryInReadOnly(self, query)
def backDBResolve(self, dbName)
def commonQueryErrorHandler(self, err, dbConnection, SQLErrorClear=False)
string CONTENT_STORAGE_TYPE
string SITE_PROP_NAME_LOG
def process(self, drceSyncTasksCover)