HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
SiteCleanUpTask.py
Go to the documentation of this file.
1 '''
2 @package: dc
3 @author igor
4 @link: http://hierarchical-cluster-engine.com/
5 @copyright: Copyright © 2013-2014 IOIX Ukraine
6 @license: http://hierarchical-cluster-engine.com/license/
7 @since: 0.1
8 '''
9 
10 import os
11 import shutil
12 import tempfile
13 import traceback
14 import sys
15 
16 import dc.EventObjects
17 from dc_db.BaseTask import BaseTask
18 import dc_db.Constants as Constants
19 from dc_db.FieldRecalculator import FieldRecalculator
20 from dc_db.StatisticLogManager import StatisticLogManager
21 from dtm.EventObjects import GeneralResponse # pylint: disable=unused-import
22 import app.Utils as Utils # pylint: disable=F0401
23 
24 logger = Utils.MPLogger().getLogger()
25 
26 
27 # template for key value file name
28 KEY_VALUE_FILE_NAME_TEMPLATE = "%s.db"
29 KEY_VALUE_FIELDS_FILE_NAME_TEMPLATE = "%s_fields.db"
30 
31 # #class implemented all logic necessary to process SiteCleanUp request
32 #
34 
35 
36  # #constructor
37  #
38  # @param keyValueStorageDir path to keyValue storage work dir
39  # @param rawDataDir path to raw data dir
40  def __init__(self, keyValueStorageDir, rawDataDir, dBDataTask):
41  self.keyValueStorageDir = keyValueStorageDir
42  self.rawDataDir = rawDataDir
43  self.errorCode = 0
44  self.errorMessage = "OK"
45  self.dBDataTask = dBDataTask
47 
48 
49  # #make all necessary actions to cleanup al site data
50  #
51  # @param siteCleanup instance of SiteCleanup object
52  # @param queryCallback function for quieries execution
53  # @return generalResponse instance of GeneralResponse object
54  def process(self, siteCleanups, queryCallback):
55  ret = GeneralResponse()
56  if not isinstance(siteCleanups, list):
57  siteCleanups = [siteCleanups]
58 
59  for siteCleanup in siteCleanups:
60  self.errorCode = 0
61  self.errorMessage = "OK"
62  if self.isSiteExist(siteCleanup.id, queryCallback):
63  self.cleanUpMysqlStorage(siteCleanup, queryCallback)
64  if siteCleanup.historyCleanUp == dc.EventObjects.SiteCleanup.HISTORY_CLEANUP_LOG or \
65  siteCleanup.historyCleanUp == dc.EventObjects.SiteCleanup.HISTORY_CLEANUP_FULL:
66  self.trancateArbitraryTable(Constants.DC_LOG_TABLE_NAME_TEMPLATE, siteCleanup, Constants.LOG_DB_ID,
67  queryCallback)
68  if siteCleanup.historyCleanUp == dc.EventObjects.SiteCleanup.HISTORY_CLEANUP_FULL:
69  self.trancateArbitraryTable(Constants.DC_FREQ_TABLE_NAME_TEMPLATE, siteCleanup, Constants.STAT_DB_ID,
70  queryCallback)
71  if siteCleanup.delayedType == dc.EventObjects.NOT_DELAYED_OPERATION:
72  self.cleanUpDBStorage(siteCleanup, KEY_VALUE_FILE_NAME_TEMPLATE, queryCallback)
73  self.cleanUpDBStorage(siteCleanup, KEY_VALUE_FIELDS_FILE_NAME_TEMPLATE, queryCallback)
74  self.cleanUpRawDataStorage(siteCleanup)
75  self.cleanUpMysqlSiteTable(siteCleanup, queryCallback)
76  if siteCleanup.moveURLs:
77  self.copyUrlsToDcUrls(siteCleanup.id, queryCallback)
78  self.fieldRecalculator.updateSiteCleanupFields(siteCleanup.id, queryCallback)
79 
80  # cleaunup attributes
81  self.trancateArbitraryTable(Constants.DC_ATT_TABLE_NAME_TEMPLATE, siteCleanup, Constants.ATT_DB_ID,
82  queryCallback)
83  else:
84  self.errorCode = Constants.EXIT_CODE_GLOBAL_ERROR
85  self.errorMessage = (">>> Site id [%s] not found" % siteCleanup.id)
86 
87  ret.errorCode = self.errorCode
88  ret.statuses.append(ret.errorCode)
89  if ret.errorMessage is None or ret.errorMessage == "":
90  ret.errorMessage = self.errorMessage
91  else:
92  ret.errorMessage += ("-" + self.errorMessage)
93 
94  return ret
95 
96 
97  # #method updates record in static db
98  #
99  # @param sqlTemplate SQL template for URLMd5's extractor
100  # @param siteCleanup instance of SiteCleanup object
101  # @param queryCallback function for quieries execution
102  def staticUpdate(self, sqlTemplate, siteCleanup, queryCallback):
103  tbName = Constants.DC_URLS_TABLE_NAME_TEMPLATE % siteCleanup.id
104  query = sqlTemplate % tbName
105  res = queryCallback(query, Constants.SECONDARY_DB_ID)
106  if res is not None:
107  for elem in res:
108  if elem[0] is not None:
109  StatisticLogManager.statisticUpdate(queryCallback, Constants.StatFreqConstants.FREQ_DELETED_STATE, 1,
110  siteCleanup.id, elem[0])
111 
112 
113  # #cleanup all site data from mysql db
114  #
115  # @param siteCleanup instance of SiteCleanup object
116  # @param queryCallback function for quieries execution
117  def cleanUpMysqlStorage(self, siteCleanup, queryCallback):
118  if siteCleanup.saveRootUrls:
119  SQL_COPY_QUERY_TEMPLATE = "INSERT INTO %s SELECT * FROM dc_urls.%s WHERE dc_urls.%s.ParentMd5 != ''"
120  SQL_DEL_QUERY_TEMPLATE = "DELETE FROM `%s` WHERE ParentMd5 != ''"
121  self.staticUpdate("SELECT `URLMd5` FROM %s WHERE ParentMd5 != ''", siteCleanup, queryCallback)
122  else:
123  SQL_COPY_QUERY_TEMPLATE = "INSERT INTO %s SELECT * FROM dc_urls.%s"
124  SQL_DEL_QUERY_TEMPLATE = "TRUNCATE TABLE `%s`"
125  self.staticUpdate("SELECT `URLMd5` FROM %s", siteCleanup, queryCallback)
126  tbName = Constants.DC_URLS_TABLE_NAME_TEMPLATE % siteCleanup.id
127  query = SQL_DEL_QUERY_TEMPLATE % tbName
128  if siteCleanup.delayedType == dc.EventObjects.NOT_DELAYED_OPERATION:
129  queryCallback(query, Constants.SECONDARY_DB_ID)
130  elif siteCleanup.delayedType == dc.EventObjects.DELAYED_OPERATION:
131  query = Constants.SQL_CREATE_QUERY_TEMPLATE % (tbName, tbName)
132  queryCallback(query, Constants.FOURTH_DB_ID)
133  if siteCleanup.saveRootUrls:
134  query = SQL_COPY_QUERY_TEMPLATE % (tbName, tbName, tbName)
135  else:
136  query = SQL_COPY_QUERY_TEMPLATE % (tbName, tbName)
137  queryCallback(query, Constants.FOURTH_DB_ID)
138  query = SQL_DEL_QUERY_TEMPLATE % tbName
139  queryCallback(query, Constants.SECONDARY_DB_ID)
140 
141 
142  # #method trancate arbitrary table in specified db
143  #
144  # @param siteCleanup instance of SiteCleanup object
145  # @param dbId specific db id
146  # @param queryCallback function for quieries execution
147  def trancateArbitraryTable(self, tablePrefix, siteCleanup, dbId, queryCallback):
148  tbName = tablePrefix % siteCleanup.id
149  SQL_TRUNCATE_QUERY_TEMPLATE = "TRUNCATE TABLE `%s`"
150  query = SQL_TRUNCATE_QUERY_TEMPLATE % tbName
151  queryCallback(query, dbId)
152 
153 
154  # #sets empty values in `sites` table for some fields
155  #
156  # @param siteCleanup instance of SiteCleanup object
157  # @param queryCallback function for quieries execution
158  def cleanUpMysqlSiteTable(self, siteCleanup, queryCallback):
159  CLEAR_SITE_RECORS_SQL = ("UPDATE `sites` SET TcDate=NOW(), Resources=0, Iterations=0, State=%s, " +
160  "ErrorMask=0, Errors=0, Contents=0, CollectedURLs=0 WHERE id = '%s'")
161  query = CLEAR_SITE_RECORS_SQL % (str(siteCleanup.state), siteCleanup.id)
162  queryCallback(query, Constants.PRIMARY_DB_ID)
163 
164 
165  # #cleanup all site data from keyvalue db
166  #
167  # @param siteCleanup instance of SiteCleanup object
168  def cleanUpDBStorage(self, siteCleanup, filesSuffix, queryCallback):
169  ret = None
170  if self.dBDataTask is not None:
171  dataDeleteRequest = dc.EventObjects.DataDeleteRequest(siteCleanup.id, None, filesSuffix)
172  ret = self.dBDataTask.process(dataDeleteRequest, queryCallback)
173  return ret
174 
175 
176  # #cleanup all site data from raw data storage
177  #
178  # @param siteCleanup instance of SiteCleanup object
179  def cleanUpRawDataStorage(self, siteCleanup):
180  try:
181  tmpDirName = self.rawDataDir + "/" + os.path.basename(tempfile.NamedTemporaryFile().name)
182  originDirName = self.rawDataDir + "/" + siteCleanup.id
183  logger.debug(">>> originDir = %s", str(originDirName))
184  os.rename(originDirName, tmpDirName)
185  shutil.rmtree(tmpDirName)
186  except Exception as err:
187  type_, value_, traceback_ = sys.exc_info()
188  logger.debug("type_ = %s, value_ = %s", str(type_), str(value_))
189  stack = traceback.format_tb(traceback_)
190  logger.debug("Error: %s\n%s", str(err), str(stack.pop()))
191  logger.debug(">>> [cleanUpRawDataStorage] CURRENT DIR " + str(os.getcwd()))
192 # self.errorCode = 2
193 # self.errorMessage = (">>> cleanUpRawDataStorage Error")
def isSiteExist(self, siteId, queryCallback, userId=None)
Definition: BaseTask.py:29
def cleanUpMysqlStorage(self, siteCleanup, queryCallback)
GeneralResponse event object, represents general state response for multipurpose usage.
def staticUpdate(self, sqlTemplate, siteCleanup, queryCallback)
def trancateArbitraryTable(self, tablePrefix, siteCleanup, dbId, queryCallback)
def __init__(self, keyValueStorageDir, rawDataDir, dBDataTask)
def process(self, siteCleanups, queryCallback)
def cleanUpRawDataStorage(self, siteCleanup)
def cleanUpMysqlSiteTable(self, siteCleanup, queryCallback)
def copyUrlsToDcUrls(self, siteId, queryCallback)
Definition: BaseTask.py:115
def cleanUpDBStorage(self, siteCleanup, filesSuffix, queryCallback)