HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
URLPurgeTask.py
Go to the documentation of this file.
1 '''
2 @package: dc
3 @author scorp
4 @link: http://hierarchical-cluster-engine.com/
5 @copyright: Copyright © 2013-2014 IOIX Ukraine
6 @license: http://hierarchical-cluster-engine.com/license/
7 @since: 0.1
8 '''
9 
10 import copy
11 import hashlib
12 import dc.EventObjects
13 from dtm.EventObjects import GeneralResponse
14 import dc_db.Constants as Constants
15 from dc_db.BaseTask import BaseTask
16 from dc_db.URLCleanupTask import URLCleanUpTask
17 from dc_db.StatisticLogManager import StatisticLogManager
18 import app.Utils as Utils # pylint: disable=F0401
19 
20 logger = Utils.MPLogger().getLogger()
21 
22 
23 # #process urlCleanUp event
25 
26  # #constructor
27  #
28  # @param keyValueStorageDir path to keyValue storage work dir
29  # @param rawDataDir path to raw data dir
30  def __init__(self, keyValueStorageDir, rawDataDir, dBDataTask):
31  super(URLPurgeTask, self).__init__()
32  self.uRLCleanUpTask = URLCleanUpTask(keyValueStorageDir, rawDataDir, dBDataTask)
33  self.dBDataTask = dBDataTask
34  self.urlMd5 = None
35 
36 
37  # #Looks available urls in getting table
38  #
39  # @param urlPurge base purge object
40  # @param tbName table name
41  # @param queryCallback function for queries execution
42  # @return bool value
43  def isAvailableUrls(self, urlPurge, tbName, queryCallback):
44  ret = False
45  localUrls = self.uRLCleanUpTask.extractUrlByCriterions(tbName[5:], True, urlPurge.criterions,
46  queryCallback, Constants.FOURTH_DB_ID)
47  if localUrls is not None and len(localUrls) > 0:
48  ret = True
49  if ret:
50  logger.debug(">>> Has urls by criterions, bdName = " + tbName)
51  else:
52  logger.debug(">>> Not content urls by criterions, bdName = " + tbName)
53  return ret
54 
55 
56 
57  # #creates (read available tables from 'urls_deleted') and returns purges list
58  #
59  # @param urlPurge base purge object
60  # @param siteLimits limitations on tables list
61  # @param queryCallback function for queries execution
62  # @return purges list
63  def getAdditionPurges(self, urlPurge, siteLimits, queryCallback):
64  ret = []
65  if siteLimits is not None and hasattr(siteLimits, '__iter__') and len(siteLimits) >= 2 and int(siteLimits[0]) >= 0:
66  query = "SHOW TABLES"
67  res = queryCallback(query, Constants.FOURTH_DB_ID)
68  if res is not None:
69  startLimit = int(siteLimits[0])
70  countLimit = int(siteLimits[1])
71  if countLimit == dc.EventObjects.URLPurge.ALL_SITES:
72  countLimit = len(res)
73  i = startLimit
74  for num in xrange(i, len(res)):
75  if len(ret) >= countLimit:
76  break
77  if res[num] is not None and res[num][0] is not None and \
78  self.isAvailableUrls(urlPurge, res[num][0], queryCallback):
79  localPurge = copy.deepcopy(urlPurge)
80  localPurge.siteId = res[num][0][5:]
81  localPurge.url = None
82  ret.append(localPurge)
83  else:
84  logger.error(">>> siteLimits field must be type of [x, x] and not None")
85  return ret
86 
87 
88  # #looks table exist in the urls_deleted tables
89  #
90  # @param siteId sites id of looking table
91  # @param queryCallback function for queries execution
92  # @return bool value
93  def isDeleteTableExist(self, siteId, queryCallback):
94  ret = False
95  query = "SHOW TABLES"
96  dbName = Constants.DC_URLS_TABLE_NAME_TEMPLATE % siteId
97  res = queryCallback(query, Constants.FOURTH_DB_ID)
98  logger.debug(">>> Delete tables = " + str(res))
99  if res is not None and hasattr(res, '__iter__'):
100  for table in res:
101  if table is not None and hasattr(table, '__iter__') and dbName in table:
102  ret = True
103  break
104  return ret
105 
106 
107  # #make all necessary actions to purging urls data from db
108  #
109  # @param urlPurges list of URLPurge objects
110  # @param queryCallback function for queries execution
111  # @return generalResponse instance of GeneralResponse object
112  def process(self, urlPurges, queryCallback):
113  generalResponse = GeneralResponse()
114 
115  newPurges = copy.deepcopy(urlPurges)
116  for urlPurge in urlPurges:
117  if urlPurge.siteId is None:
118  logger.debug(">>> Site Limits = " + str(urlPurge.siteLimits))
119  newPurges = newPurges + self.getAdditionPurges(urlPurge, urlPurge.siteLimits, queryCallback)
120 
121  if len(urlPurges) != len(newPurges):
122  logger.debug(">>> Purges reassign")
123  urlPurges = newPurges
124 
125  for urlPurge in urlPurges:
126  # @todo add more complex case
127  urlsCount = 0
128  if urlPurge.siteId == "":
129  urlPurge.siteId = "0"
130  if self.isDeleteTableExist(urlPurge.siteId, queryCallback):
131  try:
132  localUrls = []
133  if urlPurge.url is None:
134  isUrlExtract = False
135  logger.debug(">>> UrlType = " + str(urlPurge.urlType))
136  if urlPurge.urlType == dc.EventObjects.URLStatus.URL_TYPE_URL:
137  isUrlExtract = True
138  localUrls = self.uRLCleanUpTask.extractUrlByCriterions(urlPurge.siteId, isUrlExtract, urlPurge.criterions,
139  queryCallback, Constants.FOURTH_DB_ID)
140  else:
141  localUrls.append(urlPurge.url)
142  logger.debug(">>> [PURGE] localUrls size = " + str(len(localUrls)))
143  for localUrl in localUrls:
144  try:
145  urlPurge.url = localUrl
146  if not self.checkUrlInDcUrls(urlPurge, queryCallback):
147  self.uRLCleanUpTask.deleteFromDataStorage(urlPurge, queryCallback)
148  self.uRLCleanUpTask.deleteFromRawStorage(urlPurge)
149  self.deleteUrlDBField(urlPurge, queryCallback)
150  if self.urlMd5 is not None:
151  StatisticLogManager.statisticUpdate(queryCallback, Constants.StatFreqConstants.FREQ_PURGED_STATE, 1,
152  urlPurge.siteId, self.urlMd5)
153  urlsCount = urlsCount + 1
154  except Exception as ex:
155  logger.debug(">>> [PURGE] Some Type Exception [LOOP] = " + str(type(ex)) + " " + str(ex))
156  except Exception as ex:
157  logger.debug(">>> [PURGE] Some Type Exception = " + str(type(ex)) + " " + str(ex))
158  else:
159  logger.debug(">>> [PURGE] Table not found, SiteId = " + str(urlPurge.siteId))
160 
161  generalResponse.statuses.append([urlPurge.siteId, urlsCount])
162  logger.debug(">>> [PURGE] Rsult = " + str([urlPurge.siteId, urlsCount]))
163  return generalResponse
164 
165 
166  # #deletes url record from MySQL "urls_deleted" db
167  #
168  # @param urlDelete list of URLDelete objects
169  # @param queryCallback function for queries execution
170  def deleteUrlDBField(self, urlPurge, queryCallback):
171  SQL_DELETE_TEMPLATE = "DELETE FROM %s WHERE `UrlMd5` = '%s'"
172  dbName = Constants.DC_URLS_TABLE_NAME_TEMPLATE % urlPurge.siteId
173  if urlPurge.urlType == dc.EventObjects.URLStatus.URL_TYPE_URL:
174  self.urlMd5 = hashlib.md5(urlPurge.url).hexdigest()
175  else:
176  self.urlMd5 = urlPurge.url
177  query = SQL_DELETE_TEMPLATE % (dbName, self.urlMd5)
178  queryCallback(query, Constants.FOURTH_DB_ID)
179 
180 
181  # #checkUrlInDcUrls method looks URL in main(DC_URLs) DB
182  #
183  # @param urlDelete list of URLDelete objects
184  # @param queryCallback function for queries execution
185  def checkUrlInDcUrls(self, urlPurge, queryCallback):
186  ret = False
187  SQL_DELETE_TEMPLATE = "SELECT url FROM %s WHERE `UrlMd5` = '%s' AND `tcDate` NOT IN " + \
188  "(SELECT `tcDate` FROM dc_urls_deleted.%s WHERE `UrlMd5` = '%s') LIMIT 1"
189  dbName = Constants.DC_URLS_TABLE_NAME_TEMPLATE % urlPurge.siteId
190  if urlPurge.urlType == dc.EventObjects.URLStatus.URL_TYPE_URL:
191  urlMd5 = hashlib.md5(urlPurge.url).hexdigest()
192  else:
193  urlMd5 = urlPurge.url
194  query = SQL_DELETE_TEMPLATE % (dbName, urlMd5, dbName, urlMd5)
195  res = queryCallback(query, Constants.SECONDARY_DB_ID)
196  if res is not None and len(res) > 0:
197  ret = True
198  logger.debug(">>> [PURGE] checkUrlInDcUrls 'UrlMd5' = " + urlMd5)
199  if ret:
200  logger.debug(" has record in dc_urls")
201  else:
202  logger.debug(" DOESN'T has record in dc_urls")
203  return ret
def isAvailableUrls(self, urlPurge, tbName, queryCallback)
Definition: URLPurgeTask.py:43
def deleteUrlDBField(self, urlPurge, queryCallback)
GeneralResponse event object, represents general state response for multipurpose usage.
def __init__(self, keyValueStorageDir, rawDataDir, dBDataTask)
Definition: URLPurgeTask.py:30
def isDeleteTableExist(self, siteId, queryCallback)
Definition: URLPurgeTask.py:93
def checkUrlInDcUrls(self, urlPurge, queryCallback)
def process(self, urlPurges, queryCallback)
def getAdditionPurges(self, urlPurge, siteLimits, queryCallback)
Definition: URLPurgeTask.py:63