4 @link: http://hierarchical-cluster-engine.com/ 5 @copyright: Copyright © 2013-2014 IOIX Ukraine 6 @license: http://hierarchical-cluster-engine.com/license/ 29 def __init__(self, keyValueStorageDir, rawDataDir, dBDataTask, dcSiteTemplate, keyValueDefaultFile,
30 dcStatTemplate, dcLogTemplate, mutexLockTTL=Constants.DEFAULT_LOCK_TTL):
37 dBDataTask, dcStatTemplate, dcLogTemplate,
None)
46 def process(self, urlFetches, queryCallback):
50 for urlFetch
in urlFetches:
51 needToLock = urlFetch.isLocking
53 self.
dbLock(Constants.FETCH_LOCK_NAME, queryCallback, urlFetch.lockIterationTimeout, self.
mutexLockTTL)
55 if urlFetch.algorithm
is None or urlFetch.algorithm == dc.EventObjects.URLFetch.DEFAULT_ALGORITHM:
57 elif urlFetch.algorithm == dc.EventObjects.URLFetch.PROPORTIONAL_ALGORITHM:
61 attributeNames = urlFetch.attributeNames
if hasattr(urlFetch,
'attributeNames')
else [
'*']
62 url.attributes = AttrFetchTask.fetchUrlsAttributesByNames(url.siteId,
68 self.
dbUnlock(Constants.FETCH_LOCK_NAME, queryCallback)
69 except Exception, err:
70 logger.error(
'Exception: %s', str(err))
72 self.
dbUnlock(Constants.FETCH_LOCK_NAME, queryCallback)
74 except mdb.Error, err:
75 logger.error(
'mdb.Error: %s', str(err))
77 self.
dbUnlock(Constants.FETCH_LOCK_NAME, queryCallback)
80 logger.error(
'Unknown type error')
82 self.
dbUnlock(Constants.FETCH_LOCK_NAME, queryCallback)
96 if urlFetch.maxURLs
is not None and urlFetch.maxURLs > 0:
97 urlFetch.sitesList = self.
fillSiteList(urlFetch.sitesList, queryCallback, urlFetch.sitesCriterions)
100 if len(urlFetch.sitesList) > 0:
101 self.
quantMaxUrls = urlFetch.maxURLs / len(urlFetch.sitesList)
105 while len(ret) < urlFetch.maxURLs
and len(urlFetch.sitesList) > 0:
108 logger.debug(
">>> Debug-1 = %s %s %s %s",
109 str(urlFetch.urlsCriterions[dc.EventObjects.URLFetch.CRITERION_LIMIT][0]),
110 str(urlFetch.urlsCriterions[dc.EventObjects.URLFetch.CRITERION_LIMIT][1]),
111 str(urlFetch.maxURLs),
117 logger.debug(
">>> UrlFetch proportional >>> Empty SiteList")
118 self.
updateUrl(ret, urlFetch, uRLUpdateTask, queryCallback)
127 if urlFetch.urlsCriterions
is None:
128 urlFetch.urlsCriterions = {}
129 urlFetch.urlsCriterions[dc.EventObjects.URLFetch.CRITERION_LIMIT] = [offset, self.
quantMaxUrls]
139 urlFetch.sitesList = self.
fillSiteList(urlFetch.sitesList, queryCallback, urlFetch.sitesCriterions)
144 self.
updateUrl(ret, urlFetch, uRLUpdateTask, queryCallback)
155 def updateUrl(self, urls, urlFetch, uRLUpdateTask, queryCallback):
157 if urlFetch.urlUpdate
is not None:
158 for localURL
in urls:
159 if localURL.urlMd5
is not None:
161 urlUpdateList.append(urlFetch.urlUpdate)
162 urlUpdateList[0].urlMd5 = localURL.urlMd5
163 urlUpdateList[0].siteId = localURL.siteId
164 uRLUpdateTask.process(urlUpdateList, queryCallback)
165 if urlFetch.urlUpdate.status
is not None:
166 localURL.status = urlFetch.urlUpdate.status
175 query = Constants.SELECT_SQL_TEMPLATE_SIMPLE % (
"`Id`",
"sites")
176 additionQueryStr =
"" 177 additionWhereCause =
"" 178 if userId
is not None:
179 additionWhereCause += (Constants.CHECK_TABLE_SQL_ADDITION % str(userId))
181 if sitesCriterions
is None:
182 if additionWhereCause !=
"":
183 additionWhereCause +=
" AND " 184 additionWhereCause += (
"`state`=%d" % dc.EventObjects.Site.STATE_ACTIVE)
186 if sitesCriterions
is None:
187 if additionWhereCause !=
"":
188 additionQueryStr =
" WHERE " + additionWhereCause
190 if additionWhereCause ==
"":
194 query += additionQueryStr
206 def fillSiteList(self, incomeSiteList, queryCallback, sitesCriterions, userId=None):
211 logger.debug(
"!!! query: %s", str(query))
214 res = queryCallback(query, Constants.PRIMARY_DB_ID)
215 logger.debug(
"!!! res: %s", str(res))
216 except mdb.Error, err:
217 logger.error(
"Error: %s", str(err))
219 if res
is not None and hasattr(res,
'__iter__'):
221 localSiteDict[row[0]] = row[0]
222 newSiteList.append(row[0])
224 if len(incomeSiteList) == 0:
228 for site
in incomeSiteList:
229 if site
in localSiteDict:
230 newSiteList.append(site)
242 url =
dc.EventObjects.URL(row[
"Site_Id"], row[
"URL"], normalizeMask=UrlNormalizator.NORM_NONE)
243 for field
in Constants.URLTableDict.keys():
244 if hasattr(url, field)
and Constants.URLTableDict[field]
in row:
245 setattr(url, field, row[Constants.URLTableDict[field]])
246 url.UDate = Constants.readDataTimeField(
"UDate", row)
247 url.CDate = Constants.readDataTimeField(
"CDate", row)
248 url.lastModified = Constants.readDataTimeField(
"LastModified", row)
249 url.tcDate = Constants.readDataTimeField(
"TcDate", row)
250 url.pDate = Constants.readDataTimeField(
"PDate", row)
262 for siteId
in urlFetch.sitesList:
264 if urlFetch.siteUpdate
is not None:
265 urlFetch.siteUpdate.id = siteId
267 logger.debug(
"Site %s updated", str(siteId))
269 tableName = Constants.DC_URLS_TABLE_NAME_TEMPLATE % siteId
273 if urlFetch.CRITERION_SQL
in urlFetch.urlsCriterions
and len(urlFetch.urlsCriterions[urlFetch.CRITERION_SQL]) > 0:
274 additionalSQLs = self.
execAdditionalSQLs(urlFetch.urlsCriterions[urlFetch.CRITERION_SQL], siteId, queryCallback)
278 for sql
in additionalSQLs:
279 additionQueryStr = additionQueryStr.replace(
"%" + sql +
"%", additionalSQLs[sql])
281 query = Constants.SELECT_SQL_TEMPLATE_SIMPLE % (
"*", tableName)
282 if len(additionQueryStr) > 0:
284 query += additionQueryStr
286 logger.debug(
'>>> getURLFromURLTable UrlFetchQuery: ' + str(query))
288 res = queryCallback(query, Constants.SECONDARY_DB_ID, Constants.EXEC_NAME)
290 if not hasattr(res,
'__iter__'):
293 logger.debug(
'>>> getURLFromURLTable len(res): ' + str(len(res)))
301 if urlFetch.maxURLs
is not None and urlFetch.maxURLs != 0
and urlFetch.maxURLs == globalLen:
303 if "Site_Id" in row
and "URL" in row:
304 urls.append(URLFetchTask.fillUrlObj(row))
306 if urlFetch.maxURLs
is not None and urlFetch.maxURLs != 0
and urlFetch.maxURLs == globalLen:
309 logger.debug(
"Return urlsFetch count = %s", str(len(urls)))
321 q = sqls[key].replace(
"%" + Constants.SITE_ID_NAME +
"%", str(siteId))
322 r = queryCallback(q, Constants.PRIMARY_DB_ID)
323 if hasattr(r,
'__iter__')
and len(r) > 0
and r[0][0] !=
None:
328 logger.debug(
"Additional SQLs: %s", str(ret))
def fillSiteList(self, incomeSiteList, queryCallback, sitesCriterions, userId=None)
def fillCriterionLimits(self, urlFetch, offset)
def processProportial(self, urlFetch, uRLUpdateTask, queryCallback)
def processSimple(self, urlFetch, uRLUpdateTask, queryCallback)
def dbLock(self, mutexName, queryCallback, sleepTime=1, mutexLockTTL=Constants.DEFAULT_LOCK_TTL)
def execAdditionalSQLs(self, sqls, siteId, queryCallback)
def updateUrl(self, urls, urlFetch, uRLUpdateTask, queryCallback)
def process(self, urlFetches, queryCallback)
def getURLFromURLTable(self, urlFetch, globalLen, queryCallback)
def fillSiteListSQLGenerate(self, sitesCriterions, userId=None)
def dbUnlock(self, mutexName, queryCallback)
def __init__(self, keyValueStorageDir, rawDataDir, dBDataTask, dcSiteTemplate, keyValueDefaultFile, dcStatTemplate, dcLogTemplate, mutexLockTTL=Constants.DEFAULT_LOCK_TTL)
def generateCriterionSQL(self, criterions, additionWhere=None, siteId=None)