HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
BaseTask.py
Go to the documentation of this file.
1 '''
2 @package: dc
3 @author igor
4 @link: http://hierarchical-cluster-engine.com/
5 @copyright: Copyright © 2013-2014 IOIX Ukraine
6 @license: http://hierarchical-cluster-engine.com/license/
7 @since: 0.1
8 '''
9 
10 import time
11 import hashlib
12 import datetime
13 import dc_db.Constants as Constants
14 from dc_db.StatisticLogManager import StatisticLogManager
15 import dc.EventObjects
16 import app.SQLCriterions
17 import app.Utils as Utils # pylint: disable=F0401
18 
19 logger = Utils.MPLogger().getLogger()
20 
21 # #Class contains operations common for all tasks
22 class BaseTask(object):
23 
24  # #check if given site exist in current db
25  #
26  # @param siteId id of checking site
27  # @param queryCallback function for queries execution
28  # @return True if exist, or False
29  def isSiteExist(self, siteId, queryCallback, userId=None):
30  retVal = False
31  if siteId is not None:
32  query = Constants.CHECK_TABLE_SQL_TEMPLATE % siteId
33  if userId is not None:
34  query += (Constants.CHECK_TABLE_SQL_ADDITION % str(userId))
35  res = queryCallback(query, Constants.PRIMARY_DB_ID)
36  if res is not None and len(res) > 0 and len(res[0]) > 0 and res[0][0] > 0:
37  retVal = True # first elem from first tuple
38 
39  return retVal
40 
41 
42  # #Generates criterion string from urlFetch.criterions or urlUpdate.criterions fields
43  #
44  # @param criterions - incoming criterions
45  # @param additionWhere - additions WHERE cause
46  def generateCriterionSQL(self, criterions, additionWhere=None, siteId=None):
47  return app.SQLCriterions.generateCriterionSQL(criterions, additionWhere, siteId)
48 
49 
50  # #method returns ids elements fetching by criterions
51  #
52  # @param criterions - incoming criterions
53  # @param queryCallback function for queries execution
54  # @return site ids
55  def fetchByCriterions(self, criterions, queryCallback):
56  ids = []
57  additionWhere = self.generateCriterionSQL(criterions)
58  if additionWhere is not None and additionWhere != "":
59  query = "SELECT `id` FROM `sites`" + additionWhere
60  res = queryCallback(query, Constants.PRIMARY_DB_ID)
61  if hasattr(res, '__iter__'):
62  for elem in res:
63  ids.append(elem[0])
64  return ids
65 
66 
67  # #Cross-app locking, based on MySQL db, locking
68  #
69  # @param mutexName - mutex name
70  # @param queryCallback queryCallback function for queries execution
71  # @param sleepTime timeout (in seconds) between lock tries
72  # @param timeout - internal lock timeout
73  def dbLock(self, mutexName, queryCallback, sleepTime=1, mutexLockTTL=Constants.DEFAULT_LOCK_TTL):
74  logger.debug(">>> BaseTask Class. Lock start name=" + mutexName)
75  LOCK_QUERY_TEMPLATE = "SELECT mutexLock('%s', %s, %s)"
76  query = LOCK_QUERY_TEMPLATE % (mutexName, str(Constants.DB_LOCK_APPLICATION_ID), str(mutexLockTTL))
77  res = queryCallback(query, Constants.PRIMARY_DB_ID)
78  while res is not None and len(res) > 0 and res[0][0] == 0:
79  time.sleep(sleepTime)
80  res = queryCallback(query, Constants.PRIMARY_DB_ID)
81  logger.debug(">>> BaseTask Class. Lock finish name=" + mutexName)
82 
83 
84  # #Cross-app locking, based on MySQL db, unlocking
85  #
86  # @param mutexName - mutex name
87  # @param queryCallback queryCallback function for queries execution
88  def dbUnlock(self, mutexName, queryCallback):
89  logger.debug(">>> BaseTask Class. Unlock start name=" + mutexName)
90  LOCK_QUERY_TEMPLATE = "SELECT mutexUnlock('%s', %s)"
91  query = LOCK_QUERY_TEMPLATE % (mutexName, str(Constants.DB_LOCK_APPLICATION_ID))
92  queryCallback(query, Constants.PRIMARY_DB_ID)
93  logger.debug(">>> BaseTask Class. Unlock finish name=" + mutexName)
94 
95 
96  # #method makes insert query into dc_urls db
97  #
98  # @param siteId - site's id
99  # @param localKeys - url's inserting keys
100  # @param localValues - url's inserting localValues
101  def createUrlsInsertQuery(self, siteId, localKeys, localValues):
102  logger.debug(">>> Create Url Insert request")
103  query = None
104  tbName = Constants.DC_URLS_TABLE_NAME_TEMPLATE % siteId
105  fieldValueString = Constants.createFieldsValuesString(localKeys, localValues)
106  if fieldValueString is not None and fieldValueString != "":
107  query = Constants.INSERT_COMMON_TEMPLATE % (tbName, fieldValueString)
108  return query
109 
110 
111  # #Makes url's copy from dc_sites to dc_urls sites
112  #
113  # @param siteId - site's id
114  # @param queryCallback queryCallback function for queries execution
115  def copyUrlsToDcUrls(self, siteId, queryCallback):
116  logger.debug(">>> Urls copy operation")
117  COPY_SELECT_SQL_TEMPLATE = "SELECT * FROM `sites_urls` WHERE `Site_Id`='%s'"
118  query = COPY_SELECT_SQL_TEMPLATE % siteId
119  res = queryCallback(query, Constants.PRIMARY_DB_ID, Constants.EXEC_NAME)
120  # logger.debug(">>> RES - " + str(type(res)))
121  if res is not None:
122  for urlRecord in res:
123  logger.debug(">>> Urls copy operation KEY - " + str(urlRecord))
124  localKeys = []
125  localValues = []
126  for keyRecord in urlRecord:
127  for keySample in Constants.URLTableDict:
128  if keyRecord == Constants.URLTableDict[keySample] and urlRecord[keyRecord] is not None:
129  localKeys.append(keyRecord)
130  if isinstance(urlRecord[keyRecord], basestring) or \
131  isinstance(urlRecord[keyRecord], datetime.datetime):
132 # escapingStr = MySQLdb.escape_string(str(urlRecord[keyRecord])) ## remove in future
133  escapingStr = Utils.escape(str(urlRecord[keyRecord]))
134  localValues.append(("'" + escapingStr + "'"))
135  else:
136  localValues.append(str(urlRecord[keyRecord]))
137  break
138  logger.debug(">>> Urls copy operation LEN - " + str(len(localKeys)))
139  if len(localKeys) > 0:
140  query = self.createUrlsInsertQuery(siteId, localKeys, localValues)
141  if query is not None:
142  res = queryCallback(query, Constants.SECONDARY_DB_ID)
143 
144  if 'URLMd5' in urlRecord and urlRecord['URLMd5'] is not None:
145  StatisticLogManager.addNewRecord(queryCallback, siteId, urlRecord['URLMd5'])
146  if 'Status' in urlRecord and urlRecord['Status'] is not None:
147  self.statisticLogUpdate(None, urlRecord['URLMd5'], siteId, urlRecord['Status'], queryCallback, True)
148 
149 
150  # #updates statistic and logs databases
151  #
152  # @param urlObject instance of URL object
153  # @param queryCallback function for queries execution
154  def statisticLogUpdate(self, localObj, urlMd5, siteId, status, queryCallback, isInsert=False):
155  if urlMd5 is not None:
156  StatisticLogManager.addNewRecord(queryCallback, siteId, urlMd5)
157  if isInsert:
158  StatisticLogManager.statisticUpdate(queryCallback, Constants.StatFreqConstants.FREQ_INSERT, 1, siteId,
159  urlMd5)
160  StatisticLogManager.logUpdate(queryCallback, "LOG_INSERT", localObj, siteId, urlMd5)
161  if status == dc.EventObjects.URL.STATUS_NEW:
162  StatisticLogManager.statisticUpdate(queryCallback, Constants.StatFreqConstants.FREQ_NEW_STATUS, 1,
163  siteId, urlMd5)
164  StatisticLogManager.logUpdate(queryCallback, "LOG_NEW", localObj, siteId, urlMd5)
165  elif status == dc.EventObjects.URL.STATUS_SELECTED_CRAWLING:
166  StatisticLogManager.logUpdate(queryCallback, "LOG_SELECTED_CRAWLING", localObj, siteId, urlMd5)
167  elif status == dc.EventObjects.URL.STATUS_CRAWLING:
168  StatisticLogManager.logUpdate(queryCallback, "LOG_CRAWLING", localObj, siteId, urlMd5)
169  elif status == dc.EventObjects.URL.STATUS_CRAWLED:
170  StatisticLogManager.statisticUpdate(queryCallback, Constants.StatFreqConstants.FREQ_CRAWLED_STATUS, 1,
171  siteId, urlMd5)
172  StatisticLogManager.logUpdate(queryCallback, "LOG_CRAWLED", localObj, siteId, urlMd5)
173  elif status == dc.EventObjects.URL.STATUS_SELECTED_PROCESSING:
174  StatisticLogManager.logUpdate(queryCallback, "LOG_SELECTED_PROCESSING", localObj, siteId, urlMd5)
175  elif status == dc.EventObjects.URL.STATUS_PROCESSING:
176  StatisticLogManager.logUpdate(queryCallback, "LOG_PROCESSING", localObj, siteId, urlMd5)
177  elif status == dc.EventObjects.URL.STATUS_PROCESSED:
178  StatisticLogManager.statisticUpdate(queryCallback, Constants.StatFreqConstants.FREQ_PROCESSED_STATS, 1,
179  siteId, urlMd5)
180  StatisticLogManager.logUpdate(queryCallback, "LOG_PROCESSED", localObj, siteId, urlMd5)
181 
182 
183  # #calculateMd5FormUrl returns url's Md5, calculates it before or gets UrlMd5 from url value
184  #
185  # @param url - incoming url
186  # @param urlType - url's Md5 calculating type
187  # @return url's Md5
188  def calculateMd5FormUrl(self, url, urlType, useNormilize=False):
189  ret = url
190  if urlType == dc.EventObjects.URLStatus.URL_TYPE_URL:
191  logger.debug("calculateMd5FormUrl url: %s", str(url))
192 # TODO: need to be refactored and usage of URL replaced with the MD5 taken from DB
193 # if useNormilize:
194 # localUrlObj = dc.EventObjects.URL(None, url)
195 # ret = localUrlObj.urlMd5
196 # else:
197 # ret = hashlib.md5(url).hexdigest()
198  del useNormilize
199  ret = hashlib.md5(url).hexdigest()
200 
201  return ret
202 
203 
204  # #readValueFromSiteProp reead value from site_properties tables
205  #
206  # @param siteId - site's id
207  # @param propName - site property name
208  # @param queryCallback - function for queries execution
209  # @param urlMd5 - urls MD5 - addition property fetching criterion
210  @staticmethod
211  def readValueFromSiteProp(siteId, propName, queryCallback, urlMd5=None):
212  ret = None
213  query = "SELECT `Value` FROM `sites_properties` WHERE `Site_Id`='%s' AND `Name`='%s'"
214  query = (query % (siteId, propName))
215  if urlMd5 is not None:
216  query += (" AND `URLMd5`='%s'" % urlMd5)
217  query += " LIMIT 1"
218  res = queryCallback(query, Constants.PRIMARY_DB_ID)
219  if res is not None and len(res) > 0 and len(res[0]) > 0:
220  ret = res[0][0]
221  return ret
def isSiteExist(self, siteId, queryCallback, userId=None)
Definition: BaseTask.py:29
def readValueFromSiteProp(siteId, propName, queryCallback, urlMd5=None)
Definition: BaseTask.py:211
def calculateMd5FormUrl(self, url, urlType, useNormilize=False)
Definition: BaseTask.py:188
def generateCriterionSQL(criterions, additionWhere=None, siteId=None)
def dbLock(self, mutexName, queryCallback, sleepTime=1, mutexLockTTL=Constants.DEFAULT_LOCK_TTL)
Definition: BaseTask.py:73
def createUrlsInsertQuery(self, siteId, localKeys, localValues)
Definition: BaseTask.py:101
def statisticLogUpdate(self, localObj, urlMd5, siteId, status, queryCallback, isInsert=False)
Definition: BaseTask.py:154
def dbUnlock(self, mutexName, queryCallback)
Definition: BaseTask.py:88
def generateCriterionSQL(self, criterions, additionWhere=None, siteId=None)
Definition: BaseTask.py:46
def copyUrlsToDcUrls(self, siteId, queryCallback)
Definition: BaseTask.py:115
def fetchByCriterions(self, criterions, queryCallback)
Definition: BaseTask.py:55