HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
SiteUpdateTask.py
Go to the documentation of this file.
1 '''
2 @package: dc
3 @author igor
4 @link: http://hierarchical-cluster-engine.com/
5 @copyright: Copyright © 2013-2014 IOIX Ukraine
6 @license: http://hierarchical-cluster-engine.com/license/
7 @since: 0.1
8 '''
9 
10 import hashlib
11 import dc.EventObjects
12 import app.Utils as Utils # pylint: disable=F0401
13 import app.SQLCriterions
14 from dtm.EventObjects import GeneralResponse
15 import dc_db.Constants as Constants
16 from dc_db.FieldRecalculator import FieldRecalculator
17 from dc_db.BaseTask import BaseTask
18 from dc_db.SiteTask import SiteTask
19 from dc_db.URLDeleteTask import URLDeleteTask
20 
21 logger = Utils.MPLogger().getLogger()
22 
23 
24 # #sql query which checks existence of a table
25 # CHECK_TABLE_SQL_TEMPLATE = " SELECT COUNT(*) FROM sites WHERE Id = '%s'"
26 
27 # @todo move to apropriate place
28 TASK_NOT_EXIST_ERR = 2020
29 TASK_NOT_EXISTS_ERR_MSG = "Site Id Not Found"
30 
31 
33 
34 
35  # #constructor
36  #
37  # @param dcSiteTemplate path to sql template for dc_urls_* tables
38  def __init__(self, dcSiteTemplate, keyValueDefaultFile, keyValueStorageDir, rawDataDir, dBDataTask, dcStatTemplate,
39  dcLogTemplate, dcAttrTemplate):
40  super(SiteUpdateTask, self).__init__(dcSiteTemplate, keyValueDefaultFile, keyValueStorageDir, dBDataTask,
41  dcStatTemplate, dcLogTemplate, dcAttrTemplate)
42  self.urlDeleteTask = URLDeleteTask(keyValueStorageDir, rawDataDir, dBDataTask)
44 
45 
46  # #make all necessary actions to update site into in mysql db
47  #
48  # @param siteUpdate instance of SiteUpdate object
49  # @param queryCallback function for queries execution
50  # @return generalResponse instance of GeneralResponse object
51  def process(self, siteUpdate, queryCallback):
52  siteIds = []
53  response = GeneralResponse()
54  if siteUpdate.id is None:
55  siteIds.extend(self.fetchByCriterions(siteUpdate.criterions, queryCallback))
56  else:
57  siteIds.append(siteUpdate.id)
58 
59  for siteId in siteIds:
60  siteUpdate.id = siteId
61  if super(SiteUpdateTask, self).isSiteExist(siteUpdate.id, queryCallback):
62  self.updateSite(siteUpdate, queryCallback)
63  self.updateSitesFilter(siteUpdate, queryCallback)
64  self.updateSiteProperties(siteUpdate, queryCallback)
65  sitePropValue = BaseTask.readValueFromSiteProp(siteUpdate.id, "UPDATE_NOT_INSERT_ROOT_URLS", queryCallback)
66  if siteUpdate.updateType != dc.EventObjects.SiteUpdate.UPDATE_TYPE_UPDATE and\
67  (sitePropValue is None or not bool(int(sitePropValue))) and siteUpdate.urls is not None:
68  self.deleteRootUrlsFromURLsUrls(siteUpdate.id, queryCallback)
69  elif siteUpdate.updateType == dc.EventObjects.SiteUpdate.UPDATE_TYPE_UPDATE and siteUpdate.urls is not None:
70  self.updateURlsURL(siteUpdate.urls, siteId, queryCallback)
71  self.updateSiteURL(siteUpdate, queryCallback)
72  self.restartSite(siteUpdate, queryCallback)
73  if siteUpdate.updateType != dc.EventObjects.SiteUpdate.UPDATE_TYPE_UPDATE and\
74  (sitePropValue is None or not bool(int(sitePropValue))) and siteUpdate.urls is not None:
75  logger.debug("Copy root URLs from dc_sites.site_urls table to dc_urls.urls_*")
76  self.copyURLsFromSiteURlsToURLsURLs(siteUpdate.id, queryCallback)
77  self.fieldRecalculator.updateSiteCleanupFields(siteId, queryCallback)
78  response.statuses.append(GeneralResponse.ERROR_OK)
79  else:
80  response.statuses.append(TASK_NOT_EXIST_ERR)
81 
82  return response
83 
84 
85  # # update data in sites table
86  #
87  # @param siteUpdate instance of SiteUpdate object
88  # @param queryCallback function for queries execution
89  def updateSite(self, siteUpdate, queryCallback):
90  SQL_UPDATE_SITE_SQL_TEMPLATE = "UPDATE sites SET %s WHERE `Id` = '%s'"
91  fields, values = Constants.getFieldsValuesTuple(siteUpdate, Constants.siteDict)
92  fieldValueString = Constants.createFieldsValuesString(fields, values, Constants.siteExcludeList)
93  query = SQL_UPDATE_SITE_SQL_TEMPLATE % (fieldValueString, siteUpdate.id)
94  logger.debug('>>> updateSite query: ' + str(query))
95  queryCallback(query, Constants.PRIMARY_DB_ID)
96 
97 
98  # #update data in sites_filters table
99  #
100  # @param siteUpdate instance of SiteUpdate object
101  # @param queryCallback function for queries execution
102  def updateSitesFilter(self, siteUpdate, queryCallback):
103  if siteUpdate.filters is not None:
104  if siteUpdate.updateType == dc.EventObjects.SiteUpdate.UPDATE_TYPE_APPEND:
105  super(SiteUpdateTask, self).addSitesFilter(siteUpdate, queryCallback)
106  if siteUpdate.updateType == dc.EventObjects.SiteUpdate.UPDATE_TYPE_OVERWRITE:
107  query = Constants.DEL_BY_ID_QUERY_TEMPLATE % ("sites_filters", siteUpdate.id)
108  queryCallback(query, Constants.PRIMARY_DB_ID)
109  super(SiteUpdateTask, self).addSitesFilter(siteUpdate, queryCallback)
110  if siteUpdate.updateType == dc.EventObjects.SiteUpdate.UPDATE_TYPE_UPDATE:
111  super(SiteUpdateTask, self).updateSitesFilter(siteUpdate, queryCallback)
112 
113 
114  # #update data in sites_properties table
115  #
116  # @param siteUpdate instance of SiteUpdate object
117  # @param queryCallback function for queries execution
118  def updateSiteProperties(self, siteUpdate, queryCallback):
119  if siteUpdate.properties is not None:
120  if siteUpdate.updateType == dc.EventObjects.SiteUpdate.UPDATE_TYPE_APPEND:
121  super(SiteUpdateTask, self).addSiteProperties(siteUpdate, queryCallback)
122  if siteUpdate.updateType == dc.EventObjects.SiteUpdate.UPDATE_TYPE_OVERWRITE:
123  query = Constants.DEL_BY_ID_QUERY_TEMPLATE % ("sites_properties", siteUpdate.id)
124  queryCallback(query, Constants.PRIMARY_DB_ID)
125  super(SiteUpdateTask, self).addSiteProperties(siteUpdate, queryCallback)
126  if siteUpdate.updateType == dc.EventObjects.SiteUpdate.UPDATE_TYPE_UPDATE:
127  super(SiteUpdateTask, self).updateSiteProperties(siteUpdate, queryCallback)
128 
129 
130  # #update data in sites_urls table
131  #
132  # @param siteUpdate instance of SiteUpdate object
133  # @param queryCallback function for queries execution
134  def updateSiteURL(self, siteUpdate, queryCallback):
135  if siteUpdate.urls is not None:
136  site = dc.EventObjects.Site("")
137  site.urls = siteUpdate.urls
138  site.id = siteUpdate.id
139  if siteUpdate.updateType == dc.EventObjects.SiteUpdate.UPDATE_TYPE_APPEND:
140  super(SiteUpdateTask, self).addSiteURLSites(site, queryCallback)
141  if siteUpdate.updateType == dc.EventObjects.SiteUpdate.UPDATE_TYPE_OVERWRITE:
142  URL_TABLE_DEL_TEMPLATE = "DELETE FROM `sites_urls` WHERE `Site_Id` = '%s'"
143  query = URL_TABLE_DEL_TEMPLATE % siteUpdate.id
144  queryCallback(query, Constants.PRIMARY_DB_ID)
145  super(SiteUpdateTask, self).addSiteURLSites(site, queryCallback)
146  if siteUpdate.updateType == dc.EventObjects.SiteUpdate.UPDATE_TYPE_UPDATE:
147  super(SiteUpdateTask, self).updateSiteURLSites(siteUpdate, queryCallback)
148 
149 
150  # #update data in urls_ table
151  #
152  # @param urls objects list
153  # @param queryCallback function for queries execution
154  def updateURlsURL(self, urls, siteId, queryCallback):
155  query = None
156  urlMd5Defined = None
157  for urlObject in urls:
158  try:
159  if urlObject is not None:
160  if urlObject.url is not None:
161  urlMd5 = hashlib.md5(urlObject.url).hexdigest()
162  else:
163  urlMd5 = None
164  # Check is URL already exists with urlMd5 different from got to update
165  if urlObject.urlMd5 is not None and urlObject.url is not None:
166  query = str(Constants.URL_URL_SQL_SELECT_COUNT + "`URL`='%s' AND `URLMd5`<>'%s'") % \
167  (siteId, Utils.escape(urlObject.url), urlObject.urlMd5)
168  r = queryCallback(query, Constants.SECONDARY_DB_ID)
169  if r is None or (len(r) > 0) and len(r[0]) > 0 and r[0][0] > 0:
170  logger.error("Root URL '%s' already exists in dc_urls.urls_%s !", str(urlObject.url), siteId)
171  continue
172  if urlObject.url is not None and urlObject.urlMd5 is None:
173  urlMd5Defined = False
174  urlObject.urlMd5 = urlMd5
175  elif urlObject.urlMd5 is not None:
176  urlMd5Defined = True
177  else:
178  urlMd5Defined = False
179  fields, values = Constants.getFieldsValuesTuple(urlObject, Constants.URLTableDict)
180  fieldValueString = Constants.createFieldsValuesString(fields, values)
181  if urlMd5Defined is False and urlObject.url is not None:
182  query = str(Constants.URL_URL_SQL_UPDATE + " `URL`='%s'") % (siteId, fieldValueString, urlObject.url)
183  elif urlMd5Defined is True:
184  query = str(Constants.URL_URL_SQL_UPDATE + " `URLMd5`='%s'") % (siteId, fieldValueString, urlObject.urlMd5)
185  else:
186  query = None
187  if query is not None:
188  queryCallback(query, Constants.SECONDARY_DB_ID)
189  else:
190  logger.error('No url or urlMd5 value in urlObject, update query not created!\n%s' + str(urlObject))
191  # Additionally update the urlMd5 after URL update to synch them if urlMd5 not matched
192  if urlMd5Defined and urlMd5 != urlObject.urlMd5:
193  query = (Constants.URL_URL_SQL_UPDATE + " `URL`='%s'") % (siteId, 'URLMd5="' + str(urlMd5) + '"',
194  Utils.escape(urlObject.url))
195  logger.debug('Update old URLMd5 with new: %s for url: %s, query: %s', urlMd5, urlObject.url, str(query))
196  queryCallback(query, Constants.SECONDARY_DB_ID)
197  except Exception, err:
198  logger.error("Error: %s, query:%s, urlMd5Defined: %s, urlObject.url: %s", str(err), str(query),
199  str(urlMd5Defined), str(urlObject.url))
200  logger.error(Utils.getTracebackInfo())
201 
202 
203  # #update data in sites_urls and sites tables
204  #
205  # @param siteUpdate instance of SiteUpdate object
206  # @param queryCallback function for queries execution
207  def restartSite(self, siteUpdate, queryCallback):
208  if siteUpdate.state == siteUpdate.STATE_RESTART:
209  UPDATE_SITE_SQL_QUERY = ("UPDATE `sites` SET `State`=%s, `UDate`=NOW(), " +
210  "`Iterations`=`Iterations`+1, `ErrorMask`=0, `Errors`=0 WHERE Id='%s'")
211  query = UPDATE_SITE_SQL_QUERY % (str(dc.EventObjects.Site.STATE_ACTIVE), siteUpdate.id)
212  queryCallback(query, Constants.PRIMARY_DB_ID)
213  # TODO: need to remove the URLs from urls_ and then insert from sites_urls
214  # siteUpdate.urls = []
215  # SELECT_QUERY = "SELECT `URL` FROM `sites_urls` WHERE `Site_Id` = '%s'"
216  # query = SELECT_QUERY % siteUpdate.id
217  # res = queryCallback(query, Constants.PRIMARY_DB_ID)
218  # if hasattr(res, '__iter__'):
219  # for row in res:
220  # siteUpdate.urls.append(dc.EventObjects.URL(siteUpdate.id, row[0]))
221  # super(SiteUpdateTask, self).addSiteURLURLs(siteUpdate, queryCallback)
222 
223 
224  # #deleteRootUrlsUrls method deletes all root urls from dc_urls dadabases[siteId's table]
225  #
226  # @param siteId current siteId
227  # @param queryCallback function for queries execution
228  def deleteRootUrlsFromURLsUrls(self, siteId, queryCallback):
229  logger.debug("Delete all root URLs from dc_urls.urls_* table")
230  urlDeleteObj = dc.EventObjects.URLDelete(siteId, None,
231  reason=dc.EventObjects.URLDelete.REASON_SITE_UPDATE_ROOT_URLS)
232  # whereCriterionStr = \
233  # "`URLMd5` IN (SELECT `URLMd5` FROM dc_sites.sites_urls WHERE `Site_Id` = '%s')" % siteId
234  whereCriterionStr = "`ParentMd5`=''"
235  urlDeleteObj.criterions = {app.SQLCriterions.CRITERION_WHERE : whereCriterionStr}
236  self.urlDeleteTask.process([urlDeleteObj], queryCallback)
237 
238 
239  # #moveURLsFromSiteURlsToDcURLs copies all root urls from sites_urls to the dc_urls
240  #
241  # @param siteId current siteId
242  # @param queryCallback function for queries execution
243  def copyURLsFromSiteURlsToURLsURLs(self, siteId, queryCallback):
244  if len(Constants.URLTableDict) > 0:
245  tbName = Constants.DC_URLS_TABLE_NAME_TEMPLATE % siteId
246  query = "INSERT IGNORE INTO `%s` ("
247  query = query % tbName
248  fields = ''
249  for elem in Constants.URLTableDict.values():
250  fields += elem
251  fields += ','
252 
253  fields = fields[:-1]
254  query += fields
255  query += (") SELECT ")
256  query += fields
257  query += (" FROM dc_sites.sites_urls WHERE `Site_Id`='%s' GROUP BY `URLMd5`" % siteId)
258 
259  queryCallback(query, Constants.SECONDARY_DB_ID)
def isSiteExist(self, siteId, queryCallback, userId=None)
Definition: BaseTask.py:29
def updateSiteURLSites(self, site, queryCallback)
Definition: SiteTask.py:277
def addSiteURLSites(self, site, queryCallback)
Definition: SiteTask.py:264
GeneralResponse event object, represents general state response for multipurpose usage.
def restartSite(self, siteUpdate, queryCallback)
def process(self, siteUpdate, queryCallback)
def updateSiteURL(self, siteUpdate, queryCallback)
def copyURLsFromSiteURlsToURLsURLs(self, siteId, queryCallback)
def addSitesFilter(self, site, queryCallback)
Definition: SiteTask.py:121
def deleteRootUrlsFromURLsUrls(self, siteId, queryCallback)
def updateSitesFilter(self, siteUpdate, queryCallback)
def updateURlsURL(self, urls, siteId, queryCallback)
def updateSitesFilter(self, site, queryCallback)
Definition: SiteTask.py:141
def updateSiteProperties(self, site, queryCallback)
Definition: SiteTask.py:241
def __init__(self, dcSiteTemplate, keyValueDefaultFile, keyValueStorageDir, rawDataDir, dBDataTask, dcStatTemplate, dcLogTemplate, dcAttrTemplate)
def addSiteProperties(self, site, queryCallback)
Definition: SiteTask.py:222
def updateSiteProperties(self, siteUpdate, queryCallback)
def updateSite(self, siteUpdate, queryCallback)
def fetchByCriterions(self, criterions, queryCallback)
Definition: BaseTask.py:55