HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
SiteTask.py
Go to the documentation of this file.
1 '''
2 @package: dc
3 @author igor
4 @link: http://hierarchical-cluster-engine.com/
5 @copyright: Copyright © 2013-2014 IOIX Ukraine
6 @license: http://hierarchical-cluster-engine.com/license/
7 @since: 0.1
8 '''
9 import hashlib
10 import copy
11 import MySQLdb
12 from dateutil.parser import parse
13 import dc.EventObjects
14 import app.SQLCriterions
15 import app.Utils as Utils # pylint: disable=F0401
16 from app.Utils import SQLExpression
17 from dtm.EventObjects import GeneralResponse
18 from dc_db.BaseTask import BaseTask
19 from dc_db.Constants import DC_URLS_TABLE_NAME_TEMPLATE
20 import dc_db.Constants as Constants
21 from dc_db.StatisticLogManager import StatisticLogManager
22 from dc_db.SiteDeleteTask import SiteDeleteTask
23 
24 logger = Utils.MPLogger().getLogger()
25 
26 
27 # @todo move to apropriate place
28 
30 
31  FIELD_NAME_URLS = "urls"
32  FIELD_NAME_PROPERTIES = "properties"
33  FIELD_NAME_FILTERS = "filters"
34 
35 
36  # #constructor
37  #
38  # @param dcSiteTemplate path to sql template for dc_urls_* tables
39  def __init__(self, dcSiteTemplate, keyValueDefaultFile, keyValueStorageDir, dBDataTask, dcStatTemplates,
40  dcLogTemplate, dcAttrTemplate, tasksManager=None):
41  super(SiteTask, self).__init__()
42  self.dcSiteTemplate = dcSiteTemplate
43  self.keyValueDefaultFile = keyValueDefaultFile
44  self.keyValueStorageDir = keyValueStorageDir
45  self.dBDataTask = dBDataTask
46  self.dcStatTemplates = dcStatTemplates
47  self.dcLogTemplate = dcLogTemplate
48  self.dcAttrTemplate = dcAttrTemplate
49  if self.dBDataTask is not None:
50  self.siteDeleteTask = SiteDeleteTask(keyValueStorageDir, self.dBDataTask.rawDataDir, dBDataTask)
51  else:
52  self.siteDeleteTask = None
53  self.tasksManager = tasksManager
54 
55 
56  # #make all necessary actions to add new site into mysql db
57  #
58  # @param site instance of Site object
59  # @param queryCallback function for queries execution
60  # @return generalResponse instance of GeneralResponse object
61  def process(self, site, queryCallback):
62  response = GeneralResponse()
63  # if not self.isSiteExist(site.urls[0], queryCallback):
64  if not self.isSiteExist(site.id, queryCallback, site.userId):
65  self.addSite(site, queryCallback)
66  if self.tasksManager is None or self.tasksManager.SQLErrorCode == Constants.EXIT_CODE_OK:
67  self.addSitesFilter(site, queryCallback)
68  if self.tasksManager is None or self.tasksManager.SQLErrorCode == Constants.EXIT_CODE_OK:
69  self.addSiteProperties(site, queryCallback)
70  if self.tasksManager is None or self.tasksManager.SQLErrorCode == Constants.EXIT_CODE_OK:
71  self.createTableFromTemplate(site, self.dcSiteTemplate, Constants.SECONDARY_DB_ID, queryCallback)
72  # Create the same structure as for urls_<site_id> but without an unique key
73  if self.tasksManager is None or self.tasksManager.SQLErrorCode == Constants.EXIT_CODE_OK:
74  self.createTableFromTemplate(site, self.dcSiteTemplate, Constants.FOURTH_DB_ID, queryCallback,
75  {'PRIMARY KEY':'KEY'})
76  if self.tasksManager is None or self.tasksManager.SQLErrorCode == Constants.EXIT_CODE_OK:
77  self.createTableFromTemplate(site, self.dcStatTemplates, Constants.STAT_DB_ID, queryCallback)
78  if self.tasksManager is None or self.tasksManager.SQLErrorCode == Constants.EXIT_CODE_OK:
79  self.createTableFromTemplate(site, self.dcLogTemplate, Constants.LOG_DB_ID, queryCallback)
80  if self.tasksManager is None or self.tasksManager.SQLErrorCode == Constants.EXIT_CODE_OK:
81  self.createTableFromTemplate(site, self.dcAttrTemplate, Constants.ATT_DB_ID, queryCallback)
82  if self.tasksManager is None or self.tasksManager.SQLErrorCode == Constants.EXIT_CODE_OK:
83  self.addSiteURLSites(site, queryCallback)
84  if self.tasksManager is None or self.tasksManager.SQLErrorCode == Constants.EXIT_CODE_OK:
85  if site.moveURLs:
86  self.addSiteURLURLs(site, queryCallback)
87  if self.tasksManager is None or self.tasksManager.SQLErrorCode == Constants.EXIT_CODE_OK:
88  self.addSiteInKVDB(site, queryCallback)
89  if self.tasksManager is None or self.tasksManager.SQLErrorCode == Constants.EXIT_CODE_OK:
90  response.statuses.append(site.id)
91  else:
92  logger.debug(">>> Was some sql error while SITE_NEW operation start SiteDelete/rollback")
93  self.siteDelete(site, queryCallback)
94  response = GeneralResponse(Constants.TASK_SQL_ERR, Constants.TASK_SQL_ERR_MSG)
95  response.statuses.append(None)
96  else:
97  response = GeneralResponse(Constants.TASK_DUPLICATE_ERR, Constants.TASK_DUPLICATE_ERR_MSG)
98  response.statuses.append(None)
99 
100  return response
101 
102 
103  # # add data in sites table
104  #
105  # @param site instance of Site object
106  # @param queryCallback function for queries execution
107  def addSite(self, site, queryCallback):
108  fields, values = Constants.getFieldsValuesTuple(site, Constants.siteDict)
109  fieldValueString = Constants.createFieldsValuesString(fields, values)
110  if len(fieldValueString) > 0:
111  query = Constants.INSERT_COMMON_TEMPLATE % ("sites", fieldValueString)
112  queryCallback(query, Constants.PRIMARY_DB_ID)
113  else:
114  pass
115 
116 
117  # # add data in sites_filters table
118  #
119  # @param site instance of Site object
120  # @param queryCallback function for queries execution
121  def addSitesFilter(self, site, queryCallback):
122  if site.filters is not None:
123  for localFilter in site.filters:
124  if localFilter is not None:
125  localUDate = "NULL"
126  if localFilter.uDate is not None:
127  localUDate = str(localFilter.uDate)
128  query = (Constants.SITE_FILTER_SQL_TEMPLATE %
129  (Utils.escape(str(localFilter.siteId)), Utils.escape(str(localFilter.pattern)),
130  Utils.escape(str(localFilter.subject)), str(localFilter.opCode),
131  str(localFilter.stage), str(localFilter.action), localUDate, str(localFilter.type),
132  str(localFilter.mode), str(localFilter.state), str(localFilter.groupId)))
133  queryCallback(query, Constants.PRIMARY_DB_ID)
134 
135 
136 
137  # # update data in sites_filters table
138  #
139  # @param site instance of Site object
140  # @param queryCallback function for queries execution
141  def updateSitesFilter(self, site, queryCallback):
142  if site.filters is not None:
143  for localFilter in site.filters:
144  if localFilter is not None:
145  localUDate = "`UDate`"
146  if localFilter.uDate is not None:
147  localUDate = "'" + str(localFilter.uDate) + "'"
148  query = (Constants.SITE_FILTER_SQL_UPDATE %
149  (Utils.escape(str(localFilter.pattern)), Utils.escape(str(localFilter.subject)),
150  str(localFilter.opCode), str(localFilter.stage),
151  str(localFilter.action), localUDate, str(localFilter.groupId),
152  Utils.escape(str(localFilter.siteId)),
153  str(localFilter.type), str(localFilter.mode), str(localFilter.state)))
154  queryCallback(query, Constants.PRIMARY_DB_ID)
155 
156 
157  # # creates site properties dict
158  #
159  # @field - incoming site properties field
160  # @site - incomig site
161  # @return created site properties dict
162  def createPropDict(self, field, site):
163  localDict = {}
164  if isinstance(site.properties, dict):
165  if isinstance(site.properties[field], list):
166  if site.properties[field][0] is not None:
167  localDict["siteId"] = str(site.properties[field][0])
168  else:
169  localDict["siteId"] = site.id
170  localDict["name"] = str(field)
171  if site.properties[field][1] is not None:
172  localDict["value"] = str(site.properties[field][1])
173  if site.properties[field][2] is not None:
174  localDict["urlMd5"] = str(site.properties[field][2])
175  else:
176  localDict["siteId"] = site.id
177  localDict["name"] = str(field)
178  localDict["value"] = str(site.properties[field])
179  elif isinstance(site.properties, list) and isinstance(field, dict):
180  localDict = field
181 
182  if "uDate" in localDict and localDict["uDate"] is not None:
183  if self.isIsoFormatDate(localDict["uDate"]):
184  logger.debug("!!! localDict[\"uDate\"] has ISO format datetime: " + str(localDict["uDate"]))
185  localDict["uDate"] = "'" + str(localDict["uDate"]) + "'"
186  else:
187  logger.debug("!!! localDict[\"uDate\"] has not ISO format datetime: " + str(localDict["uDate"]))
188  localDict["uDate"] = SQLExpression(localDict["uDate"])
189 
190  if "cDate" in localDict and localDict["cDate"] is not None:
191  if self.isIsoFormatDate(localDict["cDate"]):
192  logger.debug("!!! localDict[\"cDate\"] has ISO format datetime: " + str(localDict["cDate"]))
193  localDict["cDate"] = "'" + str(localDict["cDate"]) + "'"
194  else:
195  logger.debug("!!! localDict[\"cDate\"] has not ISO format datetime: " + str(localDict["cDate"]))
196  localDict["cDate"] = SQLExpression(localDict["cDate"])
197 
198  return localDict
199 
200 
201  # # Check format date value
202  #
203  # @param dateValue - incoming datetime value
204  # @return True - if it string content right datetime in iso format, othrwise False
205  def isIsoFormatDate(self, dateValue):
206  # value for result
207  ret = False
208  if isinstance(dateValue, basestring):
209  try:
210  parse(dateValue)
211  ret = True
212  except Exception, err:
213  logger.debug("value '" + str(dateValue) + "' has not ISO format, try parse error: " + str(err))
214 
215  return ret
216 
217 
218  # # add data in sites_properties table
219  #
220  # @param site instance of Site object
221  # @param queryCallback function for queries execution
222  def addSiteProperties(self, site, queryCallback):
223  if site.properties is not None:
224  localDict = {}
225  query = None
226  for field in site.properties:
227  if field is not None:
228  query = None
229  localDict = self.createPropDict(field, site)
230  fields, values = Constants.getFieldsValuesTuple(localDict, Constants.propDict)
231  if len(fields) > 0 and len(fields) == len(values):
232  fieldValueString = Constants.createFieldsValuesString(fields, values)
233  query = Constants.INSERT_COMMON_TEMPLATE % ("sites_properties", fieldValueString)
234  queryCallback(query, Constants.PRIMARY_DB_ID)
235 
236 
237  # # update data in sites_properties table
238  #
239  # @param site instance of Site object
240  # @param queryCallback function for queries execution
241  def updateSiteProperties(self, site, queryCallback):
242  if site.properties is not None:
243  localDict = {}
244  query = None
245  for field in site.properties:
246  if field is not None:
247  query = None
248  localDict = self.createPropDict(field, site)
249  if "siteId" in localDict and localDict["siteId"] is not None and "name" in localDict and \
250  localDict["name"] is not None:
251  fields, values = Constants.getFieldsValuesTuple(localDict, Constants.propDict)
252  if len(fields) > 0 and len(fields) == len(values):
253  fieldValueString = Constants.createFieldsValuesString(fields, values)
254  query = Constants.SITE_PROP_SQL_UPDATE % (fieldValueString, localDict["siteId"], localDict["name"])
255  queryCallback(query, Constants.PRIMARY_DB_ID)
256  else:
257  logger.debug("Site.Properties \"SiteId\" or \"name\" fields are empty")
258 
259 
260  # # add data in sites_urls table
261  #
262  # @param site instance of Site object
263  # @param queryCallback function for queries execution
264  def addSiteURLSites(self, site, queryCallback):
265  for urlObject in site.urls:
266  if urlObject is not None:
267  fields, values = Constants.getFieldsValuesTuple(urlObject, Constants.SiteURLTableDitct)
268  fieldValueString = Constants.createFieldsValuesString(fields, values)
269  query = Constants.SITE_URL_SQL_TEMPLATE % fieldValueString
270  queryCallback(query, Constants.PRIMARY_DB_ID, Constants.EXEC_INDEX, True)
271 
272 
273  # # update data in sites_urls table
274  #
275  # @param site instance of Site object
276  # @param queryCallback function for queries execution
277  def updateSiteURLSites(self, site, queryCallback):
278  query = None
279  urlMd5Defined = False
280  for urlObject in site.urls:
281  try:
282  if urlObject is not None:
283  if urlObject.url is not None:
284  urlMd5 = hashlib.md5(urlObject.url).hexdigest()
285  else:
286  urlMd5 = None
287  # Check is URL already exists with urlMd5 different from got to update
288  if urlObject.urlMd5 is not None and urlObject.url is not None:
289  query = str(Constants.SITE_URL_SQL_SELECT_COUNT + "`URL`='%s' AND `URLMd5`<>'%s'") % \
290  (Utils.escape(urlObject.url), urlObject.urlMd5)
291  r = queryCallback(query, Constants.PRIMARY_DB_ID)
292  if r is None or (len(r) > 0) and len(r[0]) > 0 and r[0][0] > 0:
293  logger.error("Root URL '%s' already exists in dc_sites.sites_urls!", str(urlObject.url))
294  continue
295  if urlObject.url is not None and urlObject.urlMd5 is None:
296  urlMd5Defined = False
297  urlObject.urlMd5 = urlMd5
298  elif urlObject.urlMd5 is not None:
299  urlMd5Defined = True
300  else:
301  urlMd5Defined = False
302  fields, values = Constants.getFieldsValuesTuple(urlObject, Constants.SiteURLTableDitct)
303  fieldValueString = Constants.createFieldsValuesString(fields, values)
304  if urlMd5Defined is False and urlObject.url is not None:
305  query = str(Constants.SITE_URL_SQL_UPDATE + " AND `URL`='%s'") % (fieldValueString, site.id, urlObject.url)
306  elif urlMd5Defined is True:
307  query = str(Constants.SITE_URL_SQL_UPDATE + " AND `URLMd5`='%s'") % (fieldValueString, site.id,
308  urlObject.urlMd5)
309  else:
310  query = None
311  if query is not None:
312  queryCallback(query, Constants.PRIMARY_DB_ID)
313  else:
314  logger.error('No url or urlMd5 value in urlObject, update query not created!\n%s' + str(urlObject))
315  # Additionally update the urlMd5 after URL update to synch them if urlMd5 not matched
316  if urlMd5Defined and urlObject.url is not None and urlMd5 != urlObject.urlMd5:
317  query = (Constants.SITE_URL_SQL_UPDATE + "AND `URL`='%s'") % ('URLMd5="' + str(urlMd5) + '"', site.id,
318  Utils.escape(urlObject.url))
319  logger.debug('Update old URLMd5 with new: %s for site: %s, url: %s, query: %s', urlMd5, str(site.id),
320  urlObject.url, str(query))
321  queryCallback(query, Constants.PRIMARY_DB_ID)
322  except Exception, err:
323  logger.error("Error: %s, query:%s, urlMd5Defined: %s, urlObject.url: %s", str(err), str(query),
324  str(urlMd5Defined), str(urlObject.url))
325  logger.error(Utils.getTracebackInfo())
326 
327  self.statisticLogUpdate(site, urlObject.urlMd5, site.id, urlObject.status, queryCallback)
328 
329 
330  # # add data in dc_urls.url_site.id table
331  #
332  # @param site instance of Site object
333  # @param queryCallback function for queries execution
334  def addSiteURLURLs(self, site, queryCallback):
335  for urlObject in site.urls:
336  urlMD5 = hashlib.md5(urlObject.url).hexdigest()
337  StatisticLogManager.addNewRecord(queryCallback, site.id, urlMD5)
338  if isinstance(site, dc.EventObjects.Site):
339  localURLObject = copy.copy(urlObject)
340  localURLObject.siteId = site.id
341  if site.urlType is not None:
342  localURLObject.type = site.urlType
343  if site.requestDelay is not None:
344  localURLObject.requestDelay = site.requestDelay
345  if site.httpTimeout is not None:
346  localURLObject.httpTimeout = site.httpTimeout
347  localURLObject.urlMd5 = urlMD5
348  DC_SITE_URL_SQL_TEMPLATE = "INSERT IGNORE INTO `%s` SET %s"
349  fields, values = Constants.getFieldsValuesTuple(localURLObject, Constants.URLTableDict)
350  fieldValueString = Constants.createFieldsValuesString(fields, values)
351  query = DC_SITE_URL_SQL_TEMPLATE % ((DC_URLS_TABLE_NAME_TEMPLATE % site.id), fieldValueString)
352  else:
353  DC_SITE_URL_SQL_TEMPLATE = ("INSERT IGNORE INTO `%s` SET `Site_Id`='%s', `URL`='%s', `URLMD5`='%s'")
354  query = DC_SITE_URL_SQL_TEMPLATE % ((DC_URLS_TABLE_NAME_TEMPLATE % site.id), site.id,
355  MySQLdb.escape_string(urlObject.url), urlMD5) # pylint: disable=E1101
356  queryCallback(query, Constants.SECONDARY_DB_ID)
357 
358 
359  # # create new table in dc_urls db
360  #
361  # @param site instance of Site object
362  # @param template path to the current create table template file
363  # @param dbId current database id
364  # @param replaceDic strings dict() to replace key with value in template
365  # @param queryCallback function for queries execution
366  def createTableFromTemplate(self, site, template, dbId, queryCallback, replaceDic=None):
367  try:
368  if replaceDic is None:
369  replaceDic = {}
370  fTemplate = open(template).read()
371  for rFrom in replaceDic:
372  fTemplate = fTemplate.replace(rFrom, replaceDic[rFrom])
373  query = fTemplate.replace("%SITE_ID%", str(site.id))
374  queryCallback(query, dbId)
375  except Exception, err:
376  logger.error("DB error: %s\nquery: %s", str(err), template)
377 
378 
379  # # create empty file for Key-Value db
380  #
381  # @param site instance of Site object
382  # @param queryCallback callback for sql requests
383  def addSiteInKVDB(self, site, queryCallback):
384  ret = None
385  if self.dBDataTask is not None:
386  dataCreateRequest = dc.EventObjects.DataCreateRequest(site.id, None, None)
387  ret = self.dBDataTask.process(dataCreateRequest, queryCallback)
388  return ret
389 
390 
391  # # makes roolback operation for current site if SITE_NEW operating got fault
392  #
393  # @param site instance of Site object
394  # @param queryCallback callback for sql requests
395  def siteDelete(self, site, queryCallback):
396  if self.siteDeleteTask is not None:
397  siteDelete = dc.EventObjects.SiteDelete(site.id)
398  self.siteDeleteTask.process(siteDelete, queryCallback)
399 
400 
401  # #execSiteCriterion method extract sites ids by siteCriterions
402  #
403  # @param criterions - criterions dict
404  # @param queryCallback function for queries execution
405  # @return list of appropriate site's ids
406  @staticmethod
407  def execSiteCriterions(criterions, queryCallback):
408  ret = []
409  SITE_CRITERION_TEMPLATE = "SELECT `Id` FROM `sites`"
410  additionWhere = app.SQLCriterions.generateCriterionSQL(criterions, None)
411  if additionWhere is not None and additionWhere != "":
412  query = SITE_CRITERION_TEMPLATE + additionWhere
413  result = queryCallback(query, Constants.PRIMARY_DB_ID)
414  if hasattr(result, '__iter__'):
415  ret = [elem[0] for elem in result]
416  return ret
def isSiteExist(self, siteId, queryCallback, userId=None)
Definition: BaseTask.py:29
def updateSiteURLSites(self, site, queryCallback)
Definition: SiteTask.py:277
def __init__(self, dcSiteTemplate, keyValueDefaultFile, keyValueStorageDir, dBDataTask, dcStatTemplates, dcLogTemplate, dcAttrTemplate, tasksManager=None)
Definition: SiteTask.py:40
def addSiteURLSites(self, site, queryCallback)
Definition: SiteTask.py:264
GeneralResponse event object, represents general state response for multipurpose usage.
def generateCriterionSQL(criterions, additionWhere=None, siteId=None)
def siteDelete(self, site, queryCallback)
Definition: SiteTask.py:395
def addSitesFilter(self, site, queryCallback)
Definition: SiteTask.py:121
def execSiteCriterions(criterions, queryCallback)
Definition: SiteTask.py:407
def createPropDict(self, field, site)
Definition: SiteTask.py:162
def statisticLogUpdate(self, localObj, urlMd5, siteId, status, queryCallback, isInsert=False)
Definition: BaseTask.py:154
def isIsoFormatDate(self, dateValue)
Definition: SiteTask.py:205
def addSiteURLURLs(self, site, queryCallback)
Definition: SiteTask.py:334
def updateSitesFilter(self, site, queryCallback)
Definition: SiteTask.py:141
def updateSiteProperties(self, site, queryCallback)
Definition: SiteTask.py:241
def addSite(self, site, queryCallback)
Definition: SiteTask.py:107
def addSiteProperties(self, site, queryCallback)
Definition: SiteTask.py:222
def addSiteInKVDB(self, site, queryCallback)
Definition: SiteTask.py:383
def createTableFromTemplate(self, site, template, dbId, queryCallback, replaceDic=None)
Definition: SiteTask.py:366
def process(self, site, queryCallback)
Definition: SiteTask.py:61