HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
URLCleanupTask.py
Go to the documentation of this file.
1 '''
2 @package: dc
3 @author igor
4 @link: http://hierarchical-cluster-engine.com/
5 @copyright: Copyright © 2013-2014 IOIX Ukraine
6 @license: http://hierarchical-cluster-engine.com/license/
7 @since: 0.1
8 '''
9 
10 import os
11 import traceback
12 import sys
13 import shutil
14 from dc_db import FieldRecalculator
15 from dc_db import Constants
16 from dc_db.BaseTask import BaseTask
17 from dc_db.StatisticLogManager import StatisticLogManager
18 from dc_db.AttrDeleteTask import AttrDeleteTask
19 import dc.EventObjects
20 from app.Utils import PathMaker
21 import app.Utils as Utils # pylint: disable=F0401
22 from dtm.EventObjects import GeneralResponse
23 
24 
25 logger = Utils.MPLogger().getLogger()
26 
27 
28 # #process urlCleanUp event
30 
31 
32  # #constructor
33  #
34  # @param keyValueStorageDir path to keyValue storage work dir
35  # @param rawDataDir path to raw data dir
36  def __init__(self, keyValueStorageDir, rawDataDir, dBDataTask):
37  super(URLCleanUpTask, self).__init__()
38  self.keyValueStorageDir = keyValueStorageDir
39  self.rawDataDir = rawDataDir
41  self.dBDataTask = dBDataTask
42  self.urlMd5 = None
43 
44 
45  # #make all necessary actions to delete urls data from db
46  #
47  # @param urlDelete list of URLDelete objects
48  # @param queryCallback function for queries execution
49  # @return generalResponse instance of GeneralResponse object
50  def process(self, urlCleanups, queryCallback):
51  generalResponse = GeneralResponse()
52  for urlCleanup in urlCleanups:
53  # @todo add more complex case
54  if urlCleanup.siteId == "":
55  urlCleanup.siteId = "0"
56  if self.isSiteExist(urlCleanup.siteId, queryCallback):
57  try:
58  localUrls = []
59  if urlCleanup.url is None:
60  localUrls = self.extractUrlByCriterions(urlCleanup.siteId, (urlCleanup.urlType == \
61  dc.EventObjects.URLStatus.URL_TYPE_URL), urlCleanup.criterions,
62  queryCallback)
63  else:
64  localUrls.append(urlCleanup.url)
65  for localUrl in localUrls:
66  urlCleanup.url = localUrl
67  self.urlMd5 = self.calculateMd5FormUrl(urlCleanup.url, urlCleanup.urlType)
68  StatisticLogManager.logUpdate(queryCallback, "LOG_URL_CLEANUP", urlCleanup, urlCleanup.siteId, self.urlMd5)
69  if urlCleanup.delayedType == dc.EventObjects.NOT_DELAYED_OPERATION:
70  # self.deleteFromKeyValue(urlCleanup)
71  self.deleteFromDataStorage(urlCleanup, queryCallback)
72  self.deleteFromRawStorage(urlCleanup)
73  self.updateMysqlDB(urlCleanup, queryCallback, urlCleanup.siteId)
74  if urlCleanup.delayedType == dc.EventObjects.DELAYED_OPERATION:
75  self.copyUrlToDeleteDB(urlCleanup, queryCallback)
76 
77  # Call remove attributes
78  AttrDeleteTask.deleteUrlsAttributes(urlCleanup.siteId, self.urlMd5, queryCallback)
79 
80  generalResponse.statuses.append(True)
81  self.recalculator.commonRecalc(urlCleanup.siteId, queryCallback, \
82  dc.EventObjects.FieldRecalculatorObj.PARTITION_RECALC)
83  except Exception:
84  generalResponse.statuses.append(False)
85  type_, value_, traceback_ = sys.exc_info() # pylint: disable=unused-variable
86  stack = traceback.format_tb(traceback_)
87  logger.error(str(stack.pop()))
88  else:
89  generalResponse.statuses.append(False)
90  return generalResponse
91 
92 
93  # @extractUrlByCriterions method makes sql request. using criterions and returns result
94  #
95  # @param siteId - incoming siteId for SQL request addition cause
96  # @param criterions - incoming criterions for SQL request
97  # @return urls list (fetched from criterions sql)
98  def extractUrlByCriterions(self, siteId, isUrlExtract, criterions, queryCallback, dbName=Constants.SECONDARY_DB_ID,
99  tablePrefix=Constants.DC_URLS_TABLE_NAME_TEMPLATE):
100  retUrls = []
101  tableName = tablePrefix % siteId
102  if isUrlExtract:
103  SQLUrlExtractor = "SELECT `URL` FROM `%s`" % tableName
104  else:
105  SQLUrlExtractor = "SELECT `URLMd5` FROM `%s`" % tableName
106  query = SQLUrlExtractor + self.generateCriterionSQL(criterions)
107  res = queryCallback(query, dbName)
108  if hasattr(res, '__iter__'):
109  logger.debug(">>> Select URL len(res) = " + str(len(res)))
110  for row in res:
111  retUrls.append(row[0])
112 
113  return retUrls
114 
115 
116  # Get site's fields
117  #
118  # @param siteId - incoming siteId for SQL request addition cause
119  # @return fields list (fetched from criterions sql)
120  def getSiteFields(self, siteId, queryCallback, dbName=Constants.PRIMARY_DB_ID):
121  res = None
122 
123  query = "SELECT * FROM `dc_sites`.`sites` WHERE `Id` = '%s' LIMIT 1" % siteId
124  res = queryCallback(query, dbName, Constants.EXEC_NAME)
125  if len(res) > 0:
126  res = res[0]
127 
128  return res
129 
130 
131  # @todo urldelete - make base class
132  # #delete data from key value db
133  #
134  # @param urlDelete instance of URLDelete object
135  # @param queryCallback function for queries execution
136  def deleteFromDataStorage(self, urlCleanup, queryCallback):
137  # @todo try block + check table name
138  ret = None
139  if self.dBDataTask is not None:
140  localUrlMd5 = self.calculateMd5FormUrl(urlCleanup.url, urlCleanup.urlType)
141  dataDeleteRequest = dc.EventObjects.DataDeleteRequest(urlCleanup.siteId, localUrlMd5, None)
142  ret = self.dBDataTask.process(dataDeleteRequest, queryCallback)
143  return ret
144 
145 
146  # #delete data from raw storage
147  #
148  # @param urlDelete instance of URLDelete object
149  # @param queryCallback function for queries execution
150  def deleteFromRawStorage(self, urlCleanup):
151  localUrlMd5 = self.calculateMd5FormUrl(urlCleanup.url, urlCleanup.urlType)
152  dataDir = self.rawDataDir + '/' + urlCleanup.siteId + '/' + PathMaker(localUrlMd5).getDir()
153  logger.debug(">>> CLEANUP DIR = " + str(dataDir))
154  if os.path.isdir(dataDir):
155  try:
156  shutil.rmtree(dataDir)
157  hiLevelDir = dataDir[0: dataDir.rfind('/') if dataDir.rfind('/') >= 0 else len(dataDir)]
158  if len(os.listdir(hiLevelDir)) == 0:
159  shutil.rmtree(hiLevelDir)
160  except OSError as ex:
161  logger.debug(">>> [%s] Dir delete error - MSG [%s]", dataDir, str(ex.message))
162 
163 
164  # #update data in mysql db
165  #
166  # @param urlCleanup instance of URLDelete or URLCleanup object
167  # @param queryCallback function for queries execution
168  # @param siteId Site Id
169  def updateMysqlDB(self, urlCleanup, queryCallback, siteId):
170  localState = urlCleanup.state if urlCleanup.state is not None else "state"
171  localStatus = urlCleanup.status if urlCleanup.status is not None else "status"
172  uDate = ''
173  if localStatus == dc.EventObjects.URL.STATUS_NEW:
174  sf = self.getSiteFields(siteId, queryCallback)
175  if sf is not None and 'RecrawlPeriod' in sf:
176  uDate = ", `UDate`=DATE_SUB(`UDate`, INTERVAL %s MINUTE)" % sf['RecrawlPeriod']
177  else:
178  uDate = ", `UDate`=NOW()"
179  sqlt = "UPDATE `%s` SET `TcDate`=NOW()%s, `state` = '%s', `status` = '%s' WHERE `URLMD5` = '%s' LIMIT 1"
180  localUrlMd5 = self.calculateMd5FormUrl(urlCleanup.url, urlCleanup.urlType)
181  tableName = Constants.DC_URLS_TABLE_NAME_TEMPLATE % urlCleanup.siteId
182  query = sqlt % (tableName, uDate, localState, localStatus, localUrlMd5)
183  queryCallback(query, Constants.SECONDARY_DB_ID)
184 
185 
186  # #update data in mysql db
187  #
188  # @param urlCleanup instance of URLDelete or URLCleanup object
189  # @param queryCallback function for queries execution
190  def copyUrlToDeleteDB(self, urlCleanup, queryCallback):
191  SQL_COPY_QUERY_TEMPLATE = "INSERT INTO %s SELECT * FROM `dc_urls`.`%s` WHERE `URLMD5` = '%s'"
192  tbName = Constants.DC_URLS_TABLE_NAME_TEMPLATE % urlCleanup.siteId
193  query = Constants.SQL_CREATE_QUERY_TEMPLATE % (tbName, tbName)
194  queryCallback(query, Constants.FOURTH_DB_ID)
195  query = SQL_COPY_QUERY_TEMPLATE % (tbName, tbName, self.urlMd5)
196  queryCallback(query, Constants.FOURTH_DB_ID)
def process(self, urlCleanups, queryCallback)
def isSiteExist(self, siteId, queryCallback, userId=None)
Definition: BaseTask.py:29
def deleteFromRawStorage(self, urlCleanup)
def calculateMd5FormUrl(self, url, urlType, useNormilize=False)
Definition: BaseTask.py:188
def copyUrlToDeleteDB(self, urlCleanup, queryCallback)
GeneralResponse event object, represents general state response for multipurpose usage.
def getSiteFields(self, siteId, queryCallback, dbName=Constants.PRIMARY_DB_ID)
def deleteFromDataStorage(self, urlCleanup, queryCallback)
def __init__(self, keyValueStorageDir, rawDataDir, dBDataTask)
def extractUrlByCriterions(self, siteId, isUrlExtract, criterions, queryCallback, dbName=Constants.SECONDARY_DB_ID, tablePrefix=Constants.DC_URLS_TABLE_NAME_TEMPLATE)
def generateCriterionSQL(self, criterions, additionWhere=None, siteId=None)
Definition: BaseTask.py:46
def updateMysqlDB(self, urlCleanup, queryCallback, siteId)