4 @link: http://hierarchical-cluster-engine.com/ 5 @copyright: Copyright © 2013-2014 IOIX Ukraine 6 @license: http://hierarchical-cluster-engine.com/license/ 13 from datetime
import datetime
36 def __init__(self, keyValueStorageDir, rawDataDir, dBDataTask, dcSiteTemplate, keyValueDefaultFile, dcStatTemplate,
49 keyValueDefaultFile, dcStatTemplate, dcLogTemplate,
50 Constants.DEFAULT_LOCK_TTL)
68 def process(self, urlContentRequests, queryCallback):
69 urlContentResponses = []
71 for urlContentRequest
in urlContentRequests:
72 if urlContentRequest
is None:
73 urlContentResponses.append(
None)
74 elif hasattr(urlContentRequest,
"urlFetch")
and urlContentRequest.urlFetch
is not None:
76 urlFetches.append(urlContentRequest.urlFetch)
79 urlContentRequest.urlMd5 =
"" 80 urlContentRequest.siteId = url.siteId
81 urlContentRequest.url = url.url
83 StatisticLogManager.logUpdate(queryCallback,
"LOG_URL_CONTENT", urlContentRequest, urlContentRequest.siteId,
84 urlContentRequest.urlMd5)
85 urlContentResponses.append(self.
getURLContent(urlContentRequest, queryCallback))
88 StatisticLogManager.logUpdate(queryCallback,
"LOG_URL_CONTENT", urlContentRequest, urlContentRequest.siteId,
89 urlContentRequest.urlMd5)
90 urlContentResponses.append(self.
getURLContent(urlContentRequest, queryCallback))
91 logger.debug(
">>> urlContentResponses len = %s", str(len(urlContentResponses)))
94 return urlContentResponses
101 if urlContentRequest.siteId ==
"":
102 urlContentRequest.siteId =
"0" 103 if urlContentRequest.urlMd5
is None or urlContentRequest.urlMd5 ==
"":
104 urlContentRequest.urlMd5 = urlContentRequest.fillMD5(urlContentRequest.url)
113 def genDBFields(self, dbFieldsList, dbFieldsListDefaultValues, row):
115 for fName
in dbFieldsList:
116 if fName
in dbFieldsListDefaultValues:
117 ret[fName] = dbFieldsListDefaultValues[fName]
119 for fName
in dbFieldsList:
120 if fName
is not None:
122 if fName
in [
"UDate",
"CDate",
"LastModified",
"TcDate",
"PDate"]:
123 ret[str(fName)] = str(row[fName])
125 ret[str(fName)] = row[fName]
127 ret[str(fName)] =
None 140 if dataFetchResponse
is not None and len(dataFetchResponse.resultDict) > 0:
141 if ProcessedContentInternalStruct.DATA_FIELD
in dataFetchResponse.resultDict
and \
142 dataFetchResponse.resultDict[ProcessedContentInternalStruct.DATA_FIELD]
is not None and \
143 ProcessedContentInternalStruct.CDATE_FIELD
in dataFetchResponse.resultDict
and \
144 dataFetchResponse.resultDict[ProcessedContentInternalStruct.CDATE_FIELD]
is not None:
145 ret = ProcessedContentInternalStruct.parseProcessedBuf(\
146 dataFetchResponse.resultDict[ProcessedContentInternalStruct.DATA_FIELD], \
147 dataFetchResponse.resultDict[ProcessedContentInternalStruct.CDATE_FIELD], contentMask)
148 logger.debug(
">>> ret_content == " + str(ret))
149 logger.debug(
">>> UrlContent result = " + str(dataFetchResponse.__dict__))
160 tableName = Constants.DC_URLS_TABLE_NAME_TEMPLATE % siteId
161 SELECT_URL_QUERY =
"SELECT * FROM %s WHERE `URLMd5` = '%s'" 162 query = SELECT_URL_QUERY % (tableName, urlMD5)
163 res = queryCallback(query, Constants.SECONDARY_DB_ID, Constants.EXEC_NAME)
164 if hasattr(res,
'__iter__')
and len(res) >= 1:
173 def fillLists(self, filePath, elemList, typeId=dc.EventObjects.Content.CONTENT_RAW_CONTENT):
174 if os.path.isfile(filePath):
177 raw_content = fd.read()
178 localDate = datetime.fromtimestamp(os.path.getctime(filePath))
179 elemList.append(
dc.EventObjects.Content(base64.b64encode(raw_content.decode(
'utf-8')), localDate.isoformat(
' '), typeId))
181 except IOError
as err:
182 elemList.append(
None)
183 logger.debug(
">>> IOError with file = %s MSG = %s", str(filePath), str(err.message))
185 elemList.append(
None)
186 logger.debug(
">>> No file = %s", str(filePath))
193 def contentRaw(self, fList, isBreak, contentTypeId, parseAdditionType):
196 for filePath
in fList:
197 if os.path.isfile(filePath):
200 raw_content = fd.read()
201 localDate = datetime.fromtimestamp(os.path.getctime(filePath))
206 except IOError
as err:
207 logger.debug(
">>> IOError with file = %s MSG = %s", str(filePath), str(err.message))
209 if wasOpen
and parseAdditionType:
210 filePath = filePath[0: len(DC_CONSTANTS.RAW_DATA_SUFF) * -1]
211 filePath += DC_CONSTANTS.RAW_DATA_HEADERS_SUFF
212 if self.
contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_HEADERS:
213 self.
fillLists(filePath, self.
headers, dc.EventObjects.Content.CONTENT_HEADERS_CONTENT)
214 filePath = filePath[0: len(DC_CONSTANTS.RAW_DATA_HEADERS_SUFF) * -1]
215 filePath += DC_CONSTANTS.RAW_DATA_REQESTS_SUFF
216 if self.
contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_REQUESTS:
217 self.
fillLists(filePath, self.
requests, dc.EventObjects.Content.CONTENT_REQUESTS_CONTENT)
218 filePath = filePath[0: len(DC_CONSTANTS.RAW_DATA_REQESTS_SUFF) * -1]
219 filePath += DC_CONSTANTS.RAW_DATA_META_SUFF
220 if self.
contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_META:
221 self.
fillLists(filePath, self.
meta, dc.EventObjects.Content.CONTENT_META_CONTENT)
222 filePath = filePath[0: len(DC_CONSTANTS.RAW_DATA_META_SUFF) * -1]
223 filePath += DC_CONSTANTS.RAW_DATA_COOKIES_SUFF
224 if self.
contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_COOKIES:
225 self.
fillLists(filePath, self.
cookies, dc.EventObjects.Content.CONTENT_COOKIES_CONTENT)
235 def contentRawCommon(self, dataDir, localReverse=False, allFiles=False, rawDataSuff=DC_CONSTANTS.RAW_DATA_SUFF,
236 contentTypeId=dc.EventObjects.Content.CONTENT_RAW_CONTENT, parseAdditionType=True):
237 fileMask = (dataDir +
"/*" + rawDataSuff)
238 logger.debug(
">>> contentRaw fList = " + str(fileMask))
239 fList = sorted(glob.glob(fileMask), key=os.path.getctime, reverse=localReverse)
240 self.
contentRaw(fList, (
not allFiles), contentTypeId, parseAdditionType)
251 if self.
contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_RAW_LAST:
253 if self.
contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_RAW_FIRST:
255 if self.
contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_RAW_ALL:
265 dataDir = self.
rawDataDir +
"/" + urlContentRequest.siteId +
"/" +
PathMaker(urlContentRequest.urlMd5).getDir()
267 self.
contentMask = urlContentRequest.contentTypeMask
269 if self.
contentMask & (dc.EventObjects.URLContentRequest.CONTENT_TYPE_PROCESSED | \
270 dc.EventObjects.URLContentRequest.CONTENT_TYPE_PROCESSED_INTERNAL | \
271 dc.EventObjects.URLContentRequest.CONTENT_TYPE_PROCESSED_CUSTOM):
274 if self.
contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_RAW:
275 if self.
contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_RAW_LAST:
277 if self.
contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_RAW_FIRST:
279 if self.
contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_RAW_ALL:
283 dc.EventObjects.Content.CONTENT_TIDY_CONTENT, DC_CONSTANTS.RAW_DATA_TIDY_SUFF,
287 dc.EventObjects.Content.CONTENT_DYNAMIC_CONTENT, DC_CONSTANTS.RAW_DATA_DYNAMIC_SUFF,
291 dc.EventObjects.Content.CONTENT_CHAIN_PARTS, DC_CONSTANTS.RAW_DATA_CHAIN_SUFF,
294 logger.debug(
"!!!!! self.processedContents: %s", Utils.varDump(self.
processedContents, stringifyType=0, ensure_ascii=
False, strTypeMaxLen=5000))
301 row = self.
selectURLFromMySQL(urlContentRequest.siteId, urlContentRequest.urlMd5, queryCallback)
305 ret.status = row[
"Status"]
309 ret.urlMd5 = row[
"URLMd5"]
310 if "RawContentMd5" in row:
311 ret.rawContentMd5 = row[
"RawContentMd5"]
312 if "ContentURLMd5" in row:
313 ret.contentURLMd5 = row[
"ContentURLMd5"]
315 ret.siteId = row[
"Site_Id"]
316 if hasattr(urlContentRequest.dbFieldsList,
'__iter__')
and len(urlContentRequest.dbFieldsList) > 0:
317 ret.dbFields = self.
genDBFields(urlContentRequest.dbFieldsList, \
318 urlContentRequest.dbFieldsListDefaultValues, \
321 if self.
contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_ATTRIBUTES:
322 if ret.urlMd5
is not None and ret.urlMd5 !=
"" and ret.siteId
is not None:
323 ret.attributes = AttrFetchTask.fetchUrlsAttributesByNames(ret.siteId,
326 urlContentRequest.attributeNames)
def contentRaw(self, fList, isBreak, contentTypeId, parseAdditionType)
def genDBFields(self, dbFieldsList, dbFieldsListDefaultValues, row)
def __init__(self, keyValueStorageDir, rawDataDir, dBDataTask, dcSiteTemplate, keyValueDefaultFile, dcStatTemplate, dcLogTemplate)
def process(self, urlContentRequests, queryCallback)
def selectURLFromMySQL(self, siteId, urlMD5, queryCallback)
def fillAdditionContentTypes(self, typeMask, typeId, suff, dataDir)
def contentProcessed(self, dataDir, urlContentRequest, contentMask, queryCallback)
def contentRawCommon(self, dataDir, localReverse=False, allFiles=False, rawDataSuff=DC_CONSTANTS.RAW_DATA_SUFF, contentTypeId=dc.EventObjects.Content.CONTENT_RAW_CONTENT, parseAdditionType=True)
def getURLContent(self, urlContentRequest, queryCallback)
def fillLists(self, filePath, elemList, typeId=dc.EventObjects.Content.CONTENT_RAW_CONTENT)
def calcEmptyFields(self, urlContentRequest)