HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
URLContentTask.py
Go to the documentation of this file.
1 '''
2 @package: dc
3 @author igor
4 @link: http://hierarchical-cluster-engine.com/
5 @copyright: Copyright © 2013-2014 IOIX Ukraine
6 @license: http://hierarchical-cluster-engine.com/license/
7 @since: 0.1
8 '''
9 
10 import glob
11 import os
12 import base64
13 from datetime import datetime
14 
15 import dc.EventObjects
16 import dc.Constants as DC_CONSTANTS
17 import dc_db.Constants as Constants
18 from dc_db.URLFetchTask import URLFetchTask
19 from dc_db.StatisticLogManager import StatisticLogManager
20 from dc_db.ProcessedContentInternalStruct import ProcessedContentInternalStruct
21 from dc_db.AttrFetchTask import AttrFetchTask
22 from app.Utils import PathMaker
23 import app.Utils as Utils # pylint: disable=F0401
24 
25 logger = Utils.MPLogger().getLogger()
26 
27 # #process urlContent event
28 class URLContentTask(object):
29 
30 
31  # #constructor
32  #
33  # @param keyValueStorageDir path to keyValue storage work dir
34  # @param rawDataDir path to raw data dir
35  # @param dBDataTask instance of DBDataTask module
36  def __init__(self, keyValueStorageDir, rawDataDir, dBDataTask, dcSiteTemplate, keyValueDefaultFile, dcStatTemplate,
37  dcLogTemplate):
38  self.keyValueStorageDir = keyValueStorageDir
39  self.rawDataDir = rawDataDir
41  self.rawContents = []
42  self.headers = []
43  self.requests = []
44  self.meta = []
45  self.cookies = []
46  self.contentMask = None
47  self.dbDataTask = dBDataTask
48  self.urlFetchTask = URLFetchTask(keyValueStorageDir, rawDataDir, dBDataTask, dcSiteTemplate,
49  keyValueDefaultFile, dcStatTemplate, dcLogTemplate,
50  Constants.DEFAULT_LOCK_TTL)
51 
52 
53  # #method clears main contants
54  def clearContents(self):
55  self.processedContents = []
56  self.rawContents = []
57  self.headers = []
58  self.requests = []
59  self.meta = []
60  self.cookies = []
61 
62 
63  # #make all necessary actions to get urls content data from storages
64  #
65  # @param urlContentRequests list of URLContentRequest objects
66  # @param queryCallback function for queries execution
67  # @return list of URLContentResponse objects
68  def process(self, urlContentRequests, queryCallback): # pylint: disable=W0613
69  urlContentResponses = []
70  # @todo add more complex case, implemented only from rawDataDir
71  for urlContentRequest in urlContentRequests:
72  if urlContentRequest is None:
73  urlContentResponses.append(None)
74  elif hasattr(urlContentRequest, "urlFetch") and urlContentRequest.urlFetch is not None:
75  urlFetches = []
76  urlFetches.append(urlContentRequest.urlFetch)
77  urls = self.urlFetchTask.process(urlFetches, queryCallback)
78  for url in urls:
79  urlContentRequest.urlMd5 = ""
80  urlContentRequest.siteId = url.siteId
81  urlContentRequest.url = url.url
82  self.calcEmptyFields(urlContentRequest)
83  StatisticLogManager.logUpdate(queryCallback, "LOG_URL_CONTENT", urlContentRequest, urlContentRequest.siteId,
84  urlContentRequest.urlMd5)
85  urlContentResponses.append(self.getURLContent(urlContentRequest, queryCallback))
86  else:
87  self.calcEmptyFields(urlContentRequest)
88  StatisticLogManager.logUpdate(queryCallback, "LOG_URL_CONTENT", urlContentRequest, urlContentRequest.siteId,
89  urlContentRequest.urlMd5)
90  urlContentResponses.append(self.getURLContent(urlContentRequest, queryCallback))
91  logger.debug(">>> urlContentResponses len = %s", str(len(urlContentResponses)))
92  # logger.debug("!!! urlContentResponses: %s", Utils.varDump(urlContentResponses, stringifyType=0, strTypeMaxLen=5000))
93 
94  return urlContentResponses
95 
96 
97  # #calcEmptyFields method calculate values of empty fields
98  #
99  # @param urlContentRequest object of URLContentRequest type
100  def calcEmptyFields(self, urlContentRequest):
101  if urlContentRequest.siteId == "":
102  urlContentRequest.siteId = "0"
103  if urlContentRequest.urlMd5 is None or urlContentRequest.urlMd5 == "":
104  urlContentRequest.urlMd5 = urlContentRequest.fillMD5(urlContentRequest.url)
105 
106 
107  # #generates and returns dbFieldsDict
108  #
109  # @param dbFieldsList - list of db fields names
110  # @param dbFieldsListDefaultValues - dict with default values for DB fields names
111  # @param row db row, not None
112  # @return just generated dbFieldsDict dict
113  def genDBFields(self, dbFieldsList, dbFieldsListDefaultValues, row):
114  ret = {}
115  for fName in dbFieldsList:
116  if fName in dbFieldsListDefaultValues:
117  ret[fName] = dbFieldsListDefaultValues[fName]
118 
119  for fName in dbFieldsList:
120  if fName is not None:
121  if fName in row:
122  if fName in ["UDate", "CDate", "LastModified", "TcDate", "PDate"]:
123  ret[str(fName)] = str(row[fName])
124  else:
125  ret[str(fName)] = row[fName]
126  else:
127  ret[str(fName)] = None
128  return ret
129 
130 
131  # #read content from KVDB if CONTENT_TYPE_PROCESSED have setted
132  #
133  # @param urlContentRequest object of URLContentRequest type
134  # @param dataDir - contains file directory
135  # @return list of Content objects
136  def contentProcessed(self, dataDir, urlContentRequest, contentMask, queryCallback): # pylint: disable=W0613
137  ret = []
138  dataFetchRequest = dc.EventObjects.DataFetchRequest(urlContentRequest.siteId, urlContentRequest.urlMd5)
139  dataFetchResponse = self.dbDataTask.process(dataFetchRequest, queryCallback)
140  if dataFetchResponse is not None and len(dataFetchResponse.resultDict) > 0:
141  if ProcessedContentInternalStruct.DATA_FIELD in dataFetchResponse.resultDict and \
142  dataFetchResponse.resultDict[ProcessedContentInternalStruct.DATA_FIELD] is not None and \
143  ProcessedContentInternalStruct.CDATE_FIELD in dataFetchResponse.resultDict and \
144  dataFetchResponse.resultDict[ProcessedContentInternalStruct.CDATE_FIELD] is not None:
145  ret = ProcessedContentInternalStruct.parseProcessedBuf(\
146  dataFetchResponse.resultDict[ProcessedContentInternalStruct.DATA_FIELD], \
147  dataFetchResponse.resultDict[ProcessedContentInternalStruct.CDATE_FIELD], contentMask)
148  logger.debug(">>> ret_content == " + str(ret))
149  logger.debug(">>> UrlContent result = " + str(dataFetchResponse.__dict__))
150  return ret
151 
152 
153  # #extract url fields from Database
154  #
155  # @param siteId site Id
156  # @param urlMD5 urls urlMD5
157  # @return first row of SQL request
158  def selectURLFromMySQL(self, siteId, urlMD5, queryCallback):
159  row = None
160  tableName = Constants.DC_URLS_TABLE_NAME_TEMPLATE % siteId
161  SELECT_URL_QUERY = "SELECT * FROM %s WHERE `URLMd5` = '%s'"
162  query = SELECT_URL_QUERY % (tableName, urlMD5)
163  res = queryCallback(query, Constants.SECONDARY_DB_ID, Constants.EXEC_NAME)
164  if hasattr(res, '__iter__') and len(res) >= 1:
165  row = res[0]
166  return row
167 
168 
169  # #fillLists - fills incoming list of file's content
170  #
171  # @param filePath - path to the content file
172  # @param elemList - incoming filled list
173  def fillLists(self, filePath, elemList, typeId=dc.EventObjects.Content.CONTENT_RAW_CONTENT):
174  if os.path.isfile(filePath):
175  try:
176  fd = open(filePath)
177  raw_content = fd.read()
178  localDate = datetime.fromtimestamp(os.path.getctime(filePath))
179  elemList.append(dc.EventObjects.Content(base64.b64encode(raw_content.decode('utf-8')), localDate.isoformat(' '), typeId))
180  fd.close()
181  except IOError as err:
182  elemList.append(None)
183  logger.debug(">>> IOError with file = %s MSG = %s", str(filePath), str(err.message))
184  else:
185  elemList.append(None)
186  logger.debug(">>> No file = %s", str(filePath))
187 
188 
189  # #contentRaw - content reader
190  #
191  # @param fList - incoming file list
192  # @param isBreak - break after firs element or not
193  def contentRaw(self, fList, isBreak, contentTypeId, parseAdditionType):
194  fd = None
195  wasOpen = False
196  for filePath in fList:
197  if os.path.isfile(filePath):
198  try:
199  fd = open(filePath)
200  raw_content = fd.read()
201  localDate = datetime.fromtimestamp(os.path.getctime(filePath))
202  self.rawContents.append(dc.EventObjects.Content(base64.b64encode(raw_content), localDate.isoformat(' '),
203  contentTypeId))
204  wasOpen = True
205  fd.close()
206  except IOError as err:
207  logger.debug(">>> IOError with file = %s MSG = %s", str(filePath), str(err.message))
208 
209  if wasOpen and parseAdditionType:
210  filePath = filePath[0: len(DC_CONSTANTS.RAW_DATA_SUFF) * -1]
211  filePath += DC_CONSTANTS.RAW_DATA_HEADERS_SUFF
212  if self.contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_HEADERS:
213  self.fillLists(filePath, self.headers, dc.EventObjects.Content.CONTENT_HEADERS_CONTENT)
214  filePath = filePath[0: len(DC_CONSTANTS.RAW_DATA_HEADERS_SUFF) * -1]
215  filePath += DC_CONSTANTS.RAW_DATA_REQESTS_SUFF
216  if self.contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_REQUESTS:
217  self.fillLists(filePath, self.requests, dc.EventObjects.Content.CONTENT_REQUESTS_CONTENT)
218  filePath = filePath[0: len(DC_CONSTANTS.RAW_DATA_REQESTS_SUFF) * -1]
219  filePath += DC_CONSTANTS.RAW_DATA_META_SUFF
220  if self.contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_META:
221  self.fillLists(filePath, self.meta, dc.EventObjects.Content.CONTENT_META_CONTENT)
222  filePath = filePath[0: len(DC_CONSTANTS.RAW_DATA_META_SUFF) * -1]
223  filePath += DC_CONSTANTS.RAW_DATA_COOKIES_SUFF
224  if self.contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_COOKIES:
225  self.fillLists(filePath, self.cookies, dc.EventObjects.Content.CONTENT_COOKIES_CONTENT)
226  if isBreak:
227  break
228 
229 
230  # #contentRawCommon - common content reader
231  #
232  # @param dataDir - contains file directory
233  # @param localReverse - file reverse sorting (boolean)
234  # @param allFiles - all files read or not (boolean)
235  def contentRawCommon(self, dataDir, localReverse=False, allFiles=False, rawDataSuff=DC_CONSTANTS.RAW_DATA_SUFF,
236  contentTypeId=dc.EventObjects.Content.CONTENT_RAW_CONTENT, parseAdditionType=True):
237  fileMask = (dataDir + "/*" + rawDataSuff)
238  logger.debug(">>> contentRaw fList = " + str(fileMask))
239  fList = sorted(glob.glob(fileMask), key=os.path.getctime, reverse=localReverse)
240  self.contentRaw(fList, (not allFiles), contentTypeId, parseAdditionType)
241 
242 
243  # #fillAdditionContentTypes fills result with contents of addition raw content types.
244  #
245  # @param typeMask - typeMask of supported content type
246  # @param typeId - content type id, needs for filling Content obj
247  # @param suff - raw data file sufffix
248  # @param dataDir - raw data file storage dir
249  def fillAdditionContentTypes(self, typeMask, typeId, suff, dataDir):
250  if self.contentMask & typeMask:
251  if self.contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_RAW_LAST:
252  self.contentRawCommon(dataDir, True, False, suff, typeId, False)
253  if self.contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_RAW_FIRST:
254  self.contentRawCommon(dataDir, False, False, suff, typeId, False)
255  if self.contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_RAW_ALL:
256  self.contentRawCommon(dataDir, False, True, suff, typeId, False)
257 
258 
259  # #extract url content from mandatory storage - implemented RAW!!
260  #
261  # @param urlContentRequest instance of URLContentRequest objects
262  # @param queryCallback function for queries execution
263  # @return list of URLContentResponse objects
264  def getURLContent(self, urlContentRequest, queryCallback):
265  dataDir = self.rawDataDir + "/" + urlContentRequest.siteId + "/" + PathMaker(urlContentRequest.urlMd5).getDir()
266  self.clearContents()
267  self.contentMask = urlContentRequest.contentTypeMask
268 
269  if self.contentMask & (dc.EventObjects.URLContentRequest.CONTENT_TYPE_PROCESSED | \
270  dc.EventObjects.URLContentRequest.CONTENT_TYPE_PROCESSED_INTERNAL | \
271  dc.EventObjects.URLContentRequest.CONTENT_TYPE_PROCESSED_CUSTOM):
272  self.processedContents.extend(self.contentProcessed(dataDir, urlContentRequest, self.contentMask, queryCallback))
273 
274  if self.contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_RAW:
275  if self.contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_RAW_LAST:
276  self.contentRawCommon(dataDir, True, False)
277  if self.contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_RAW_FIRST:
278  self.contentRawCommon(dataDir, False, False)
279  if self.contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_RAW_ALL:
280  self.contentRawCommon(dataDir, False, True)
281 
282  self.fillAdditionContentTypes(dc.EventObjects.URLContentRequest.CONTENT_TYPE_TIDY,
283  dc.EventObjects.Content.CONTENT_TIDY_CONTENT, DC_CONSTANTS.RAW_DATA_TIDY_SUFF,
284  dataDir)
285 
286  self.fillAdditionContentTypes(dc.EventObjects.URLContentRequest.CONTENT_TYPE_DYNAMIC,
287  dc.EventObjects.Content.CONTENT_DYNAMIC_CONTENT, DC_CONSTANTS.RAW_DATA_DYNAMIC_SUFF,
288  dataDir)
289 
290  self.fillAdditionContentTypes(dc.EventObjects.URLContentRequest.CONTENT_TYPE_CHAIN,
291  dc.EventObjects.Content.CONTENT_CHAIN_PARTS, DC_CONSTANTS.RAW_DATA_CHAIN_SUFF,
292  dataDir)
293 
294  logger.debug("!!!!! self.processedContents: %s", Utils.varDump(self.processedContents, stringifyType=0, ensure_ascii=False, strTypeMaxLen=5000))
295 
296  ret = dc.EventObjects.URLContentResponse(urlContentRequest.url, self.rawContents, self.processedContents)
297  ret.headers = self.headers
298  ret.requests = self.requests
299  ret.meta = self.meta
300  ret.cookies = self.cookies
301  row = self.selectURLFromMySQL(urlContentRequest.siteId, urlContentRequest.urlMd5, queryCallback)
302 
303  if row is not None:
304  if "Status" in row:
305  ret.status = row["Status"]
306  if "URL" in row:
307  ret.url = row["URL"]
308  if "URLMd5" in row:
309  ret.urlMd5 = row["URLMd5"]
310  if "RawContentMd5" in row:
311  ret.rawContentMd5 = row["RawContentMd5"]
312  if "ContentURLMd5" in row:
313  ret.contentURLMd5 = row["ContentURLMd5"]
314  if "Site_Id" in row:
315  ret.siteId = row["Site_Id"]
316  if hasattr(urlContentRequest.dbFieldsList, '__iter__') and len(urlContentRequest.dbFieldsList) > 0:
317  ret.dbFields = self.genDBFields(urlContentRequest.dbFieldsList, \
318  urlContentRequest.dbFieldsListDefaultValues, \
319  row)
320 
321  if self.contentMask & dc.EventObjects.URLContentRequest.CONTENT_TYPE_ATTRIBUTES:
322  if ret.urlMd5 is not None and ret.urlMd5 != "" and ret.siteId is not None:
323  ret.attributes = AttrFetchTask.fetchUrlsAttributesByNames(ret.siteId,
324  ret.urlMd5,
325  queryCallback,
326  urlContentRequest.attributeNames)
327 
328  return ret
def contentRaw(self, fList, isBreak, contentTypeId, parseAdditionType)
def genDBFields(self, dbFieldsList, dbFieldsListDefaultValues, row)
def __init__(self, keyValueStorageDir, rawDataDir, dBDataTask, dcSiteTemplate, keyValueDefaultFile, dcStatTemplate, dcLogTemplate)
def process(self, urlContentRequests, queryCallback)
def selectURLFromMySQL(self, siteId, urlMD5, queryCallback)
def fillAdditionContentTypes(self, typeMask, typeId, suff, dataDir)
def contentProcessed(self, dataDir, urlContentRequest, contentMask, queryCallback)
def contentRawCommon(self, dataDir, localReverse=False, allFiles=False, rawDataSuff=DC_CONSTANTS.RAW_DATA_SUFF, contentTypeId=dc.EventObjects.Content.CONTENT_RAW_CONTENT, parseAdditionType=True)
def getURLContent(self, urlContentRequest, queryCallback)
def fillLists(self, filePath, elemList, typeId=dc.EventObjects.Content.CONTENT_RAW_CONTENT)
def calcEmptyFields(self, urlContentRequest)