HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
URLAgeTask.py
Go to the documentation of this file.
1 '''
2 @package: dc
3 @author scorp
4 @link: http://hierarchical-cluster-engine.com/
5 @copyright: Copyright © 2013-2014 IOIX Ukraine
6 @license: http://hierarchical-cluster-engine.com/license/
7 @since: 0.1
8 '''
9 
10 import dc_db.Constants as Constants
11 from dc_db.BaseTask import BaseTask
12 from dc_db.URLDeleteTask import URLDeleteTask
13 from dc_db.StatisticLogManager import StatisticLogManager
14 import dc.EventObjects
15 from dtm.EventObjects import GeneralResponse
16 import app.Utils as Utils # pylint: disable=F0401
17 
18 logger = Utils.MPLogger().getLogger()
19 
20 # #process URLAgeTask event
22 
23 
24  # #constructor
25  #
26  # @param keyValueStorageDir path to keyValue storage work dir
27  # @param rawDataDir path to raw data dir
28  def __init__(self, keyValueStorageDir, rawDataDir, backDBResolve):
29  super(URLAgeTask, self).__init__()
30  self.uRLDeleteTask = URLDeleteTask(keyValueStorageDir, rawDataDir, backDBResolve)
31  self.urlsSelectDict = {}
32  self.gloablLoopExit = False
33  self.curUrlsCount = 0
34 
35 
36  # #makes real UrlDelte operation
37  #
38  # @param queryCallback - sql execute callback function
39  def urlDeleteOperation(self, queryCallback):
40  urlsDeleteObjs = []
41  localUrlDelete = None
42  for siteId in self.urlsSelectDict:
43  for urlMd5 in self.urlsSelectDict[siteId]:
44  localUrlDelete = dc.EventObjects.URLDelete(siteId, urlMd5, reason=dc.EventObjects.URLDelete.REASON_AGING)
45  localUrlDelete.urlType = dc.EventObjects.URLStatus.URL_TYPE_MD5
46  localUrlDelete.delayedType = self.urlsSelectDict[siteId][urlMd5]
47  urlsDeleteObjs.append(localUrlDelete)
48  StatisticLogManager.statisticUpdate(queryCallback, Constants.StatFreqConstants.FREQ_AGED_STATE, 1,
49  siteId, urlMd5)
50  if len(urlsDeleteObjs) > 0:
51  logger.debug(">>> URLAge started URLDelete count = " + str(len(urlsDeleteObjs)))
52  self.uRLDeleteTask.process(urlsDeleteObjs, queryCallback)
53 
54 
55  # #addElemInLocalDict added new element in localDict, checks limits
56  #
57  # @param siteId - site'd Id
58  # @param UrlMd5 -url's Md5
59  # @return urlLimit -global urls limit
60  # @return delayedType - delayed type (using in URLDelete operation)
61  def addElemInLocalDict(self, siteId, UrlMd5, urlLimit, delayedType):
62  if siteId not in self.urlsSelectDict:
63  self.urlsSelectDict[siteId] = {}
64  if UrlMd5 in self.urlsSelectDict[siteId]:
65  logger.debug(">>> " + siteId + "." + UrlMd5 + " Already selected")
66  else:
67  if self.curUrlsCount < urlLimit:
68  self.urlsSelectDict[siteId][UrlMd5] = delayedType
69  logger.debug(">>> " + siteId + "." + UrlMd5 + " Added")
70  self.curUrlsCount += 1
71  else:
72  logger.debug(">>> UrlLimit reached = " + str(urlLimit))
73 
74 
75  # #make all necessary actions to aging urls data from db
76  #
77  # @param urlDelete list of URLDelete objects
78  # @param queryCallback function for queries execution
79  # @return generalResponse instance of GeneralResponse object
80  def process(self, urlAges, queryCallback):
81  self.curUrlsCount = 0
82  generalResponse = GeneralResponse()
83  self.urlsSelectDict = {}
84  URL_SELECT_TEMPL = "SELECT `UrlMd5` FROM %s"
85  for urlAge in urlAges:
86  defaultUrlsCriterions = urlAge.urlsCriterions[dc.EventObjects.URLAge.CRITERION_WHERE]
87  if self.gloablLoopExit:
88  break
89  query = "SELECT `Id` FROM `sites`"
90  sitesCriterionStr = self.generateCriterionSQL(urlAge.sitesCriterions)
91  if len(sitesCriterionStr) > 0:
92  query += " " + sitesCriterionStr
93  sitesRes = queryCallback(query, Constants.PRIMARY_DB_ID)
94  if sitesRes is not None:
95  for sitesElem in sitesRes:
96  if self.gloablLoopExit:
97  break
98  if sitesElem is not None and len(sitesElem) > 0:
99  # StatisticLogManager.logUpdate(queryCallback, "LOG_URL_AGING", urlAge, sitesElem[0], "")
100  # Get the alternate URLs select criterion from the sites_properties table
101  queryAltURLsCrit = \
102  "SELECT `Value` FROM `sites_properties` WHERE `Site_Id`='%s' AND `Name`='AGING_URL_CRITERION' LIMIT 1"\
103  % sitesElem[0]
104  altURLsCritRes = queryCallback(queryAltURLsCrit, Constants.PRIMARY_DB_ID)
105  criterionsSubstituted = False
106  if altURLsCritRes is not None:
107  for altURLsCritItem in altURLsCritRes:
108  if altURLsCritItem is not None and len(altURLsCritItem) > 0:
109  # Overwrite criterion WHERE with value from dc_sites.sites_properties
110  urlAge.urlsCriterions[dc.EventObjects.URLAge.CRITERION_WHERE] = altURLsCritItem[0]
111  criterionsSubstituted = True
112  if not criterionsSubstituted:
113  urlAge.urlsCriterions[dc.EventObjects.URLAge.CRITERION_WHERE] = defaultUrlsCriterions
114  # Make criterion for URLs select
115  tableName = Constants.DC_URLS_TABLE_NAME_TEMPLATE % sitesElem[0]
116  query = URL_SELECT_TEMPL % tableName
117  urlsCriterionStr = self.generateCriterionSQL(urlAge.urlsCriterions, None, sitesElem[0])
118  if len(urlsCriterionStr) > 0:
119  query += " " + urlsCriterionStr
120  # Select URLs
121  urlsRes = queryCallback(query, Constants.SECONDARY_DB_ID)
122  for urlsRes in urlsRes:
123  if self.gloablLoopExit:
124  break
125  if urlsRes is not None and len(urlsRes) > 0:
126  self.addElemInLocalDict(sitesElem[0], urlsRes[0], urlAge.maxURLs, urlAge.delayedType)
127  StatisticLogManager.logUpdate(queryCallback, "LOG_URL_AGING", urlAge, sitesElem[0], urlsRes[0])
128  if len(self.urlsSelectDict) > 0:
129  self.urlDeleteOperation(queryCallback)
130  return generalResponse
GeneralResponse event object, represents general state response for multipurpose usage.
def __init__(self, keyValueStorageDir, rawDataDir, backDBResolve)
Definition: URLAgeTask.py:28
def process(self, urlAges, queryCallback)
Definition: URLAgeTask.py:80
def urlDeleteOperation(self, queryCallback)
Definition: URLAgeTask.py:39
def addElemInLocalDict(self, siteId, UrlMd5, urlLimit, delayedType)
Definition: URLAgeTask.py:61
def generateCriterionSQL(self, criterions, additionWhere=None, siteId=None)
Definition: BaseTask.py:46