HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
URLUpdateTask.py
Go to the documentation of this file.
1 '''
2 @package: dc
3 @author igor
4 @link: http://hierarchical-cluster-engine.com/
5 @copyright: Copyright © 2013-2014 IOIX Ukraine
6 @license: http://hierarchical-cluster-engine.com/license/
7 @since: 0.1
8 '''
9 
10 from dtm.EventObjects import GeneralResponse
11 import dc_db.Constants as Constants
12 from dc_db.BaseTask import BaseTask
13 from dc_db.StatisticLogManager import StatisticLogManager
14 from dc_db.URLPutTask import URLPutTask
15 from dc_db.AttrUpdateTask import AttrUpdateTask
16 import app.Utils as Utils # pylint: disable=F0401
17 
18 logger = Utils.MPLogger().getLogger()
19 
20 # #process urlUpdate event
22 
23  # #constructor
24  #
25  def __init__(self, keyValueStorageDir, rawDataDir, dBDataTask):
26  super(URLUpdateTask, self).__init__()
27  self.urlPutTask = URLPutTask(keyValueStorageDir, rawDataDir, dBDataTask)
28 
29 
30  # #make all necessary actions to get update urls data in db
31  #
32  # @param urlUpdates list of URLUpdate objects
33  # @param queryCallback function for queries execution
34  # @return generalResponse instance of GeneralResponse object
35  def process(self, urlUpdates, queryCallback):
36  ret = GeneralResponse()
37  status = False
38  for urlUpdate in urlUpdates:
39  status = False
40  if urlUpdate.siteId == "":
41  urlUpdate.siteId = "0"
42  if not hasattr(urlUpdate, "urlMd5"):
43  urlUpdate.fillMD5(urlUpdate.url, urlUpdate.type)
44  if self.isSiteExist(urlUpdate.siteId, queryCallback):
45  self.statisticUpdate(urlUpdate, queryCallback)
46  status = self.updateURL(urlUpdate, queryCallback)
47  if status and urlUpdate.attributes is not None and len(urlUpdate.attributes) > 0:
48  self.attributesUpdate(urlUpdate.attributes, queryCallback)
49  ret.statuses.append(status)
50  if "urlPut" in urlUpdate.__dict__ and urlUpdate.urlPut is not None:
51  self.urlPutOperation(urlUpdate, urlUpdate.urlPut, queryCallback)
52  return ret
53 
54 
55  # #update records in statistic and log db
56  #
57  # @param urlUpdate instance of URLUpdate object
58  # @param queryCallback function for queries execution
59  def statisticUpdate(self, urlUpdate, queryCallback):
60  prevStatus = None
61  SQL_SELECT_STATUS_TEMPLATE = "SELECT `Status` FROM `%s` WHERE `URLMD5` = '%s'"
62  tableName = Constants.DC_URLS_TABLE_NAME_TEMPLATE % urlUpdate.siteId
63  query = SQL_SELECT_STATUS_TEMPLATE % (tableName, urlUpdate.urlMd5)
64  ret = queryCallback(query, Constants.SECONDARY_DB_ID)
65  if ret is not None and len(ret) > 0 and len(ret[0]) > 0 and ret[0][0] is not None:
66  prevStatus = int(ret[0][0])
67  StatisticLogManager.statisticUpdate(queryCallback, Constants.StatFreqConstants.FREQ_UPDATE, 1,
68  urlUpdate.siteId, urlUpdate.urlMd5)
69  StatisticLogManager.logUpdate(queryCallback, "LOG_UPDATE", urlUpdate, urlUpdate.siteId, urlUpdate.urlMd5)
70  if prevStatus is None or prevStatus != urlUpdate.status:
71  self.statisticLogUpdate(urlUpdate, urlUpdate.urlMd5, urlUpdate.siteId, urlUpdate.status, queryCallback)
72 
73 
74  # #update url in db
75  #
76  # @param urlUpdate instance of URLUpdate object
77  # @param queryCallback function for queries execution
78  def updateURL(self, urlUpdate, queryCallback):
79  ret = False
80  SQL_UPDATE_URLSITE_TEMPLATE = "UPDATE IGNORE `%s` SET %s"
81  if urlUpdate.eTag is not None:
82  urlUpdate.eTag = urlUpdate.eTag.strip("\"'")
83  fields, values = Constants.getFieldsValuesTuple(urlUpdate, Constants.URLTableDict)
84  fieldValueString = Constants.createFieldsValuesString(fields, values, Constants.urlExcludeList)
85  if fieldValueString and len(fieldValueString) > 0:
86  tableName = Constants.DC_URLS_TABLE_NAME_TEMPLATE % urlUpdate.siteId
87  query = SQL_UPDATE_URLSITE_TEMPLATE % (tableName, fieldValueString)
88  additionWhere = None
89  if urlUpdate.urlMd5 is not None:
90  additionWhere = ("`URLMD5` = '%s'" % urlUpdate.urlMd5)
91  additionQueryStr = self.generateCriterionSQL(urlUpdate.criterions, additionWhere)
92  if len(additionQueryStr) > 0:
93  query += " "
94  query += additionQueryStr
95  queryCallback(query, Constants.SECONDARY_DB_ID)
96  ret = True
97  return ret
98 
99 
100  # #makes URLPutTask operation
101  #
102  # @param urlObject instance of URL or URLUpdate object
103  # @param urlPutObject instance of URLPut object
104  # @param queryCallback function for queries execution
105  def urlPutOperation(self, urlObject, urlPutObject, queryCallback):
106  if urlPutObject.siteId is None and urlObject.siteId is not None:
107  urlPutObject.siteId = urlObject.siteId
108  logger.debug(">>> URLPut.siteId is None and set to the = " + urlPutObject.siteId)
109  if urlPutObject.urlMd5 is None and urlObject.urlMd5 is not None:
110  urlPutObject.urlMd5 = urlObject.urlMd5
111  logger.debug(">>> URLPut.urlMd5 is None and set to the = " + urlPutObject.urlMd5)
112  logger.debug(">>> Call internal URLPut")
113  self.urlPutTask.process([urlPutObject], queryCallback)
114 
115 
116  # #updatesAttributes
117  #
118  # @param attributes list of AttributeUpdate objects
119  # @param queryCallback function for queries execution
120  def attributesUpdate(self, attributes, queryCallback):
121  logger.debug(">>> URLUpdateTask.attributesUpdate (len) = " + str(len(attributes)))
122  attrUpdateTask = AttrUpdateTask()
123  res = attrUpdateTask.process(attributes, queryCallback)
124  logger.debug(">>> URLUpdateTask.attributesUpdate (res) == " + str(res))
def isSiteExist(self, siteId, queryCallback, userId=None)
Definition: BaseTask.py:29
def urlPutOperation(self, urlObject, urlPutObject, queryCallback)
def process(self, urlUpdates, queryCallback)
GeneralResponse event object, represents general state response for multipurpose usage.
def statisticUpdate(self, urlUpdate, queryCallback)
def attributesUpdate(self, attributes, queryCallback)
def statisticLogUpdate(self, localObj, urlMd5, siteId, status, queryCallback, isInsert=False)
Definition: BaseTask.py:154
def updateURL(self, urlUpdate, queryCallback)
def generateCriterionSQL(self, criterions, additionWhere=None, siteId=None)
Definition: BaseTask.py:46
def __init__(self, keyValueStorageDir, rawDataDir, dBDataTask)