HCE Project Python language Distributed Tasks Manager Application, Distributed Crawler Application and client API bindings.  2.0.0-chaika
Hierarchical Cluster Engine Python language binding
ClientInterfaceService.py
Go to the documentation of this file.
1 '''
2 Created on Apr 8, 2014
3 
4 @package: dc
5 @author: scorp, bgv
6 @link: http://hierarchical-cluster-engine.com/
7 @copyright: Copyright © 2013-2014 IOIX Ukraine
8 @license: http://hierarchical-cluster-engine.com/license/
9 @since: 0.1
10 '''
11 
12 import time
13 import logging
14 import ConfigParser
15 from dateutil.parser import parse
16 from app.BaseServerManager import BaseServerManager
17 import app.Utils as Utils # pylint: disable=F0401
18 import app.Consts as APP_CONSTS
19 from dc.Constants import EVENT_TYPES, LOGGER_NAME
20 from dc import EventObjects
21 import dc.Constants as DC_CONSTANTS
22 import transport.Consts as consts
23 import dtm.EventObjects
24 
25 
26 # Logger initialization
27 logger = Utils.MPLogger().getLogger()
28 
30 
31  CONFIG_SECTION = "ClientInterfaceService"
32  CONFIG_SERVER_HOST = "serverHost"
33  CONFIG_SERVER_PORT = "serverPort"
34  CONFIG_SITES_MANAGER = "clientSitesManager"
35  SERVER_CONNECTION_NAME = "server"
36  CONNECTION_PREFIX = "Connection"
37  CONFIG_BATCH_TASKS_MANAGER_REALTIME = "clientBatchTasksManagerRealTime"
38  CONFIG_DRCE_NODES = "DRCENodes"
39 
40  # #constructor
41  # initialise all connections and event handlers
42  # @param configParser config parser object
43  # @param connectBuilderLight instance of ConnectBuilderLight object
44  #
45  def __init__(self, configParser, connectBuilderLight):
46  '''
47  Constructor
48  '''
49  BaseServerManager.__init__(self)
50 
51  serverHost = configParser.get(self.CONFIG_SECTION, self.CONFIG_SERVER_HOST)
52  serverPort = configParser.get(self.CONFIG_SECTION, self.CONFIG_SERVER_PORT)
53  server = serverHost + ":" + str(serverPort)
54  self.sitesManager = configParser.get(self.CONFIG_SECTION, self.CONFIG_SITES_MANAGER)
56 
57  try:
58  self.configVars[self.CONFIG_DRCE_NODES] = configParser.get(APP_CONSTS.CONFIG_APPLICATION_SECTION_NAME,
59  self.CONFIG_DRCE_NODES)
60  except ConfigParser.NoOptionError:
61  self.configVars[self.CONFIG_DRCE_NODES] = 1
62 
63  serverConnection = connectBuilderLight.build(consts.SERVER_CONNECT, server, consts.TCP_TYPE)
64  sitesManagerConnection = connectBuilderLight.build(consts.CLIENT_CONNECT, self.sitesManager)
65  batchTasksManagerRealTimeConnection = connectBuilderLight.build(consts.CLIENT_CONNECT,
67 
68  self.addConnection(self.SERVER_CONNECTION_NAME + self.CONNECTION_PREFIX, serverConnection)
69  self.addConnection(str(self.sitesManager) + self.CONNECTION_PREFIX, sitesManagerConnection)
71  batchTasksManagerRealTimeConnection)
72 
73  self.setEventHandler(EVENT_TYPES.SITE_NEW, self.onSitesManagerRoute)
74  self.setEventHandler(EVENT_TYPES.SITE_UPDATE, self.onSitesManagerRoute)
75  self.setEventHandler(EVENT_TYPES.SITE_STATUS, self.onSitesManagerRoute)
76  self.setEventHandler(EVENT_TYPES.SITE_DELETE, self.onSitesManagerRoute)
77  self.setEventHandler(EVENT_TYPES.SITE_CLEANUP, self.onSitesManagerRoute)
78  self.setEventHandler(EVENT_TYPES.SITE_FIND, self.onSitesManagerRoute)
79 
80  self.setEventHandler(EVENT_TYPES.URL_NEW, self.onSitesManagerRoute)
81  self.setEventHandler(EVENT_TYPES.URL_STATUS, self.onSitesManagerRoute)
82  self.setEventHandler(EVENT_TYPES.URL_UPDATE, self.onSitesManagerRoute)
83  self.setEventHandler(EVENT_TYPES.URL_FETCH, self.onSitesManagerRoute)
84  self.setEventHandler(EVENT_TYPES.URL_DELETE, self.onSitesManagerRoute)
85  self.setEventHandler(EVENT_TYPES.URL_CLEANUP, self.onSitesManagerRoute)
86  self.setEventHandler(EVENT_TYPES.URL_CONTENT, self.onSitesManagerRoute)
87  self.setEventHandler(EVENT_TYPES.SQL_CUSTOM, self.onSitesManagerRoute)
88  self.setEventHandler(EVENT_TYPES.URL_PUT, self.onSitesManagerRoute)
89  self.setEventHandler(EVENT_TYPES.URL_HISTORY, self.onSitesManagerRoute)
90  self.setEventHandler(EVENT_TYPES.URL_STATS, self.onSitesManagerRoute)
91 
92  self.setEventHandler(EVENT_TYPES.PROXY_NEW, self.onSitesManagerRoute)
93  self.setEventHandler(EVENT_TYPES.PROXY_UPDATE, self.onSitesManagerRoute)
94  self.setEventHandler(EVENT_TYPES.PROXY_DELETE, self.onSitesManagerRoute)
95  self.setEventHandler(EVENT_TYPES.PROXY_STATUS, self.onSitesManagerRoute)
96  self.setEventHandler(EVENT_TYPES.PROXY_FIND, self.onSitesManagerRoute)
97 
98  self.setEventHandler(EVENT_TYPES.ATTR_SET, self.onSitesManagerRoute)
99  self.setEventHandler(EVENT_TYPES.ATTR_UPDATE, self.onSitesManagerRoute)
100  self.setEventHandler(EVENT_TYPES.ATTR_DELETE, self.onSitesManagerRoute)
101  self.setEventHandler(EVENT_TYPES.ATTR_FETCH, self.onSitesManagerRoute)
102 
103  self.setEventHandler(EVENT_TYPES.SITE_NEW_RESPONSE, self.onDCClientRoute)
104  self.setEventHandler(EVENT_TYPES.SITE_UPDATE_RESPONSE, self.onDCClientRoute)
105  self.setEventHandler(EVENT_TYPES.SITE_STATUS_RESPONSE, self.onDCClientRoute)
106  self.setEventHandler(EVENT_TYPES.SITE_DELETE_RESPONSE, self.onDCClientRoute)
107  self.setEventHandler(EVENT_TYPES.SITE_CLEANUP_RESPONSE, self.onDCClientRoute)
108  self.setEventHandler(EVENT_TYPES.SITE_FIND_RESPONSE, self.onDCClientRoute)
109 
110  self.setEventHandler(EVENT_TYPES.URL_NEW_RESPONSE, self.onDCClientRoute)
111  self.setEventHandler(EVENT_TYPES.URL_STATUS_RESPONSE, self.onDCClientRoute)
112  self.setEventHandler(EVENT_TYPES.URL_UPDATE_RESPONSE, self.onDCClientRoute)
113  self.setEventHandler(EVENT_TYPES.URL_FETCH_RESPONSE, self.onDCClientRoute)
114  self.setEventHandler(EVENT_TYPES.URL_DELETE_RESPONSE, self.onDCClientRoute)
115  self.setEventHandler(EVENT_TYPES.URL_CLEANUP_RESPONSE, self.onDCClientRoute)
116  self.setEventHandler(EVENT_TYPES.URL_CONTENT_RESPONSE, self.onDCClientRoute)
117  self.setEventHandler(EVENT_TYPES.SQL_CUSTOM_RESPONSE, self.onDCClientRoute)
118  self.setEventHandler(EVENT_TYPES.URL_PUT_RESPONSE, self.onDCClientRoute)
119  self.setEventHandler(EVENT_TYPES.URL_HISTORY_RESPONSE, self.onDCClientRoute)
120  self.setEventHandler(EVENT_TYPES.URL_STATS_RESPONSE, self.onDCClientRoute)
121 
122  self.setEventHandler(EVENT_TYPES.PROXY_NEW_RESPONSE, self.onDCClientRoute)
123  self.setEventHandler(EVENT_TYPES.PROXY_UPDATE_RESPONSE, self.onDCClientRoute)
124  self.setEventHandler(EVENT_TYPES.PROXY_DELETE_RESPONSE, self.onDCClientRoute)
125  self.setEventHandler(EVENT_TYPES.PROXY_STATUS_RESPONSE, self.onDCClientRoute)
126  self.setEventHandler(EVENT_TYPES.PROXY_FIND_RESPONSE, self.onDCClientRoute)
127 
128  self.setEventHandler(EVENT_TYPES.ATTR_SET_RESPONSE, self.onDCClientRoute)
129  self.setEventHandler(EVENT_TYPES.ATTR_UPDATE_RESPONSE, self.onDCClientRoute)
130  self.setEventHandler(EVENT_TYPES.ATTR_DELETE_RESPONSE, self.onDCClientRoute)
131  self.setEventHandler(EVENT_TYPES.ATTR_FETCH_RESPONSE, self.onDCClientRoute)
132 
133  self.setEventHandler(EVENT_TYPES.BATCH, self.onBatchTasksManagerRealTimeRoute)
134  self.setEventHandler(EVENT_TYPES.BATCH_RESPONSE, self.onDCClientRoute)
135 
136  # map of incoming event, which are in processing
137  # event.uid => event without eventObj field
138  self.processEvents = dict()
139 
140 
141 
142  # #handler to route all event to SitesManager
143  #
144  # @param even instance of Event object
145  def onSitesManagerRoute(self, event):
146  try:
147  logger.debug("Received event: " + Utils.varDump(event))
148  self.send(str(self.sitesManager) + self.CONNECTION_PREFIX, event)
149  self.registerEvent(event)
150  except KeyError as err:
151  logger.error(err.message)
152  except Exception, err:
153  logger.error("Error `%s`", str(err))
154 
155 
156 
157  # #handler to route all event to BatchTasksManagerRealTime
158  #
159  # @param even instance of Event object
161  try:
162  logger.debug("Received event: " + Utils.varDump(event))
163  self.send(str(self.batchTasksManagerRealTime) + self.CONNECTION_PREFIX, event)
164  self.registerEvent(event)
165  except KeyError as err:
166  logger.error(err.message)
167  except Exception, err:
168  logger.error("Error `%s`", str(err))
169 
170 
171 
172  # #handler to route all response event to DCClient
173  #
174  # @param even instance of Event object
175  def onDCClientRoute(self, event):
176  try:
177  request_event = self.getRequestEvent(event)
178  if event.cookie is not None and isinstance(event.cookie, dict) and\
179  DC_CONSTANTS.MERGE_PARAM_NAME in event.cookie and bool(event.cookie[DC_CONSTANTS.MERGE_PARAM_NAME]) is False:
180  logger.debug("No merge results specified in Event.cookie!")
181  else:
182  logger.debug("Results merge try by default.")
183  event.eventObj = self.mergeResultsData(event.eventType, event.eventObj, event.cookie)
184  self.reply(request_event, event)
185  logger.debug("Results sent to DCC.")
186  self.unregisterEvent(request_event)
187  except KeyError as err:
188  logger.error(err.message)
189 
190 
191 
192  # #add event in map of processing events
193  #
194  # @param even instance of Event object
195  def registerEvent(self, event):
196  event.eventObj = None
197  self.processEvents[event.uid] = event
198 
199 
200 
201  # #get request event from processEvents map
202  # @param event instance of Event object
203  # @return event instance of Event object
204  def getRequestEvent(self, event):
205  return self.processEvents[event.uid]
206 
207 
208 
209  # #delete event in map of processing events
210  #
211  # @param even instance of Event object
212  def unregisterEvent(self, event):
213  del self.processEvents[event.uid]
214 
215 
216 
217  # #Merge results of operation request to represent results from several hosts as single merged data
218  #
219  # @param eventType type of event, see constants definition
220  # @param eventObj response object for requested operation
221  # @param eventCookie the event's cookie
222  # @return eventObj object changed or not
223  def mergeResultsData(self, eventType, eventObj, eventCookie):
224  if isinstance(eventObj, EventObjects.ClientResponse):
225  # If number of client response items grater than one perform the merging, else leave untouched
226  if len(eventObj.itemsList) > 1:
227  if isinstance(eventObj.itemsList[0], EventObjects.ClientResponseItem):
228  # Create new list of ClientResponseItem items and fill it with first item from response
229  newItemObject = None
230  newHost = ""
231  newPort = ""
232  newNode = ""
233  newTime = ""
234  newErrorMessage = ""
235  newErrorCode = ""
236  logger.debug("Merging, response items: %s", str(len(eventObj.itemsList)))
237  mergedCounter = 0
238  # Cycle response items
239  for clientResponseItem in eventObj.itemsList:
240  # If response item exists
241  if clientResponseItem.itemObject is not None:
242  mergedCounter = mergedCounter + 1
243  logger.debug("clientResponseItem:\n" + Utils.varDump(clientResponseItem))
244  newHost += clientResponseItem.host + ";"
245  newPort += str(clientResponseItem.port) + ";"
246  newNode += clientResponseItem.node + ";"
247  newTime += str(clientResponseItem.time) + ";"
248  newErrorMessage += clientResponseItem.errorMessage
249  newErrorCode += str(clientResponseItem.errorCode) + ";"
250  if eventType == EVENT_TYPES.SITE_FIND_RESPONSE:
251  # Merge SITE_FIND operation results response
252  newItemObject = self.mergeResultsSiteFind(newItemObject, clientResponseItem)
253  elif eventType == EVENT_TYPES.URL_FETCH_RESPONSE or eventType == EVENT_TYPES.URL_STATUS_RESPONSE:
254  # Merge URL_FETCH operation results response
255  newItemObject = self.mergeResultsURLFetch(newItemObject, clientResponseItem)
256  elif eventType == EVENT_TYPES.URL_CONTENT_RESPONSE:
257  # Merge URL_CONTENT operation results response
258  newItemObject = self.mergeResultsURLContent(newItemObject, clientResponseItem)
259  elif eventType == EVENT_TYPES.SITE_STATUS_RESPONSE:
260  # Merge SITE_STATUS operation results response
261  newItemObject = self.mergeResultsSiteStatus(newItemObject, clientResponseItem)
262  # elif eventType == EVENT_TYPES.SITE_NEW_RESPONSE:
263  # #Merge SITE_NEW operation results response
264  # newItemObject = self.mergeResultsGeneralResponse(newItemObject, clientResponseItem.itemObject)
265  # #Merge BATCH_RESPONSE operation results response
266  elif eventType == EVENT_TYPES.BATCH_RESPONSE:
267  # Merge URL_CONTENT operation results response
268  newItemObject = self.mergeResultsBatch(newItemObject, clientResponseItem)
269  else:
270  l = {EVENT_TYPES.SITE_NEW_RESPONSE, EVENT_TYPES.SITE_UPDATE_RESPONSE, EVENT_TYPES.SITE_DELETE_RESPONSE,
271  EVENT_TYPES.SITE_CLEANUP_RESPONSE, EVENT_TYPES.URL_NEW_RESPONSE, EVENT_TYPES.URL_UPDATE_RESPONSE,
272  EVENT_TYPES.URL_DELETE_RESPONSE, EVENT_TYPES.URL_CLEANUP_RESPONSE, EVENT_TYPES.URL_UPDATE_RESPONSE,
273  EVENT_TYPES.URL_DELETE_RESPONSE, EVENT_TYPES.URL_CLEANUP_RESPONSE} # pylint: disable=unused-import
274  if eventType in l:
275  # Merge another result types that no need special processing of fields but check and leave just one
276  newItemObject = self.mergeResultsGeneralResponse(newItemObject, clientResponseItem)
277  else:
278  # Object is null or wrong type
279  logger.error("The clientResponseItem.itemObject is None!")
280  # Replace items list with newly created and merged if itemObject is set
281  if newItemObject is not None:
282  eventObj.itemsList = [eventObj.itemsList[0]]
283  eventObj.itemsList[0].errorCode = newErrorCode.rstrip(";")
284  eventObj.itemsList[0].errorMessage = newErrorMessage.rstrip(";")
285  eventObj.itemsList[0].itemObject = newItemObject
286  eventObj.itemsList[0].host = newHost.rstrip(";")
287  eventObj.itemsList[0].port = newPort.rstrip(";")
288  eventObj.itemsList[0].node = newNode.rstrip(";")
289  eventObj.itemsList[0].time = newTime.rstrip(";")
290  if isinstance(newItemObject, list):
291  itemsNumber = len(newItemObject)
292  else:
293  itemsNumber = 1
294  logger.debug("Merged with " + str(mergedCounter) + " response objects, " + \
295  str(itemsNumber) + " merged items.")
296  # Does event is URL_CONTENT_RESPONSE and sort criterion defined
297  if eventType == EVENT_TYPES.URL_CONTENT_RESPONSE and eventCookie is not None\
298  and isinstance(eventCookie, dict) and EventObjects.URLFetch.CRITERION_ORDER in eventCookie and\
299  len(eventCookie[EventObjects.URLFetch.CRITERION_ORDER]) > 0 and\
300  eventCookie[EventObjects.URLFetch.CRITERION_ORDER][0].strip() != '':
301  # TODO: now use only criterion of the first item of request items list
302  eventObj.itemsList[0].itemObject = self.sortURLContentResults(newItemObject,
303  eventCookie[EventObjects.URLFetch.CRITERION_ORDER][0]) # pylint: disable=C0330
304  logger.debug("URL_CONTENT results sorted by the: %s",
305  str(eventCookie[EventObjects.URLFetch.CRITERION_ORDER][0]))
306  # Does event is URL_FETCH_RESPONSE and sort criterion defined
307  elif eventType == EVENT_TYPES.URL_FETCH_RESPONSE and eventCookie is not None\
308  and isinstance(eventCookie, dict) and EventObjects.URLFetch.CRITERION_ORDER in eventCookie and\
309  len(eventCookie[EventObjects.URLFetch.CRITERION_ORDER]) > 0 and\
310  eventCookie[EventObjects.URLFetch.CRITERION_ORDER][0].strip() != '':
311  # TODO: now use only criterion of the first item of request items list
312  eventObj.itemsList[0].itemObject = self.sortURLFetchResults(newItemObject,
313  eventCookie[EventObjects.URLFetch.CRITERION_ORDER][0]) # pylint: disable=C0330
314  logger.debug("URL_CONTENT results sorted by the: %s",
315  str(eventCookie[EventObjects.URLFetch.CRITERION_ORDER][0]))
316  else:
317  logger.debug("Sort conditions are not satisfied, results not sorted.")
318  else:
319  logger.debug("No items collected while merge procedure, merge skipped.")
320  else:
321  # Wrong type
322  logger.error("Wrong eventObj.itemsList[0] type " + str(type(eventObj.itemsList[0])) + \
323  " expected EventObjects.ClientResponseItem\n" + Utils.varDump(eventObj.itemsList[0]))
324  else:
325  logger.error("Wrong eventObj type " + str(type(eventObj)) + " expected EventObjects.ClientResponse\n" + \
326  Utils.varDump(eventObj))
327 
328  return eventObj
329 
330 
331 
332  # #Sort results of operation URL_CONTENT
333  #
334  # @param itemObject items list to sort
335  # @param itemsList current items list from current itemObject of node response results
336  # @return sorted itemsList
337  def sortURLContentResults(self, itemObject, criterion):
338  ret = itemObject
339 
340  try:
341  crits = criterion.split(',')
342  crits = crits[0]
343  crits = criterion.split(' ')
344  if len(crits) == 0:
345  crits = ('CDate', False)
346  elif len(crits) == 1:
347  crits = (crits[0], False)
348  else:
349  if crits[1] == 'ASC':
350  crits[1] = False
351  else:
352  crits[1] = True
353 
354  if crits[0] == 'PDate':
355  def orderKey(itemObject):
356  if itemObject.dbFields['PDate'] is not None:
357  return parse(itemObject.dbFields['PDate']).strftime('%s')
358  else:
359  return -1
360  elif crits[0] == 'CDate':
361  def orderKey(itemObject):
362  return parse(itemObject.dbFields['CDate']).strftime('%s')
363  elif crits[0] == 'UDate':
364  def orderKey(itemObject):
365  return parse(itemObject.dbFields['UDate']).strftime('%s')
366  elif crits[0] == 'TcDate':
367  def orderKey(itemObject):
368  return parse(itemObject.dbFields['TcDate']).strftime('%s')
369  elif crits[0] == 'Size':
370  def orderKey(itemObject):
371  return itemObject.dbFields['Size']
372  elif crits[0] == 'Depth':
373  def orderKey(itemObject):
374  return itemObject.dbFields['Depth']
375  elif crits[0] == 'TagsCount':
376  def orderKey(itemObject):
377  return itemObject.dbFields['TagsCount']
378  elif crits[0] == 'ContentURLMd5':
379  def orderKey(itemObject):
380  return itemObject.dbFields['ContentURLMd5']
381  else:
382  def orderKey(itemObject):
383  return itemObject.dbFields['CDate']
384 
385  ret = sorted(ret, key=orderKey, reverse=crits[1])
386 
387  except Exception, err:
388  logger.error("Exception: '%s', criterion: '%s'", str(err), str(criterion))
389 
390  return ret
391 
392 
393  # #Sort results of operation URL_FETCH
394  #
395  # @param itemObject items list to sort
396  # @param itemsList current items list from current itemObject of node response results
397  # @return sorted itemsList
398  def sortURLFetchResults(self, itemObject, criterion):
399  ret = itemObject
400 
401  try:
402  crits = criterion.split(',')
403  crits = crits[0]
404  crits = criterion.split(' ')
405  if len(crits) == 0:
406  crits = ('CDate', False)
407  elif len(crits) == 1:
408  crits = (crits[0], False)
409  else:
410  if crits[1] == 'ASC':
411  crits[1] = False
412  else:
413  crits[1] = True
414 
415  if crits[0] == 'PDate':
416  def orderKey(itemObject):
417  return parse(itemObject.pDate).strftime('%s')
418  elif crits[0] == 'CDate':
419  def orderKey(itemObject):
420  return parse(itemObject.CDate).strftime('%s')
421  elif crits[0] == 'UDate':
422  def orderKey(itemObject):
423  return parse(itemObject.UDate).strftime('%s')
424  elif crits[0] == 'TcDate':
425  def orderKey(itemObject):
426  return parse(itemObject.tcDate).strftime('%s')
427  elif crits[0] == 'Size':
428  def orderKey(itemObject):
429  return itemObject.size
430  elif crits[0] == 'Depth':
431  def orderKey(itemObject):
432  return itemObject.depth
433  elif crits[0] == 'TagsCount':
434  def orderKey(itemObject):
435  return itemObject.tagsCount
436  elif crits[0] == 'ContentURLMd5':
437  def orderKey(itemObject):
438  return itemObject.contentURLMd5
439  else:
440  def orderKey(itemObject):
441  return itemObject.CDate
442 
443  ret = sorted(ret, key=orderKey, reverse=crits[1])
444  except Exception as err:
445  logger.error("Exception: %s", str(err))
446 
447  return ret
448 
449 
450 
451  # #Merge results of operation URL_CONTENT
452  #
453  # @param newItemsList new itemsList for new merged itemObject
454  # @param itemsList current items list from current itemObject of node response results
455  # @return new itemsList for merged itemObject
456  def mergeResultsURLContent(self, newItemsList, clientResponseItem):
457  itemsList = clientResponseItem.itemObject
458 
459  # If this is first item in results list, init itemObject with empty list
460  if newItemsList is None:
461  newItemsList = []
462 
463  if isinstance(itemsList, list):
464  # For each URLContentResponse object in the clientResponseItem.itemObject
465  for urlContentResponse in itemsList:
466  # If raw or processed content exists
467  if len(urlContentResponse.rawContents) > 0 or len(urlContentResponse.processedContents) > 0:
468  # Check is candidate valid
469  for i, itemObject in enumerate(newItemsList):
470  # Is item already exists in accumulated list by urlMd5 from the same site or by the rawContentMd5 or
471  # the contentURLMd5 from another
472  if itemObject.siteId == urlContentResponse.siteId and\
473  (itemObject.urlMd5 == urlContentResponse.urlMd5 or\
474  (urlContentResponse.rawContentMd5 != '' and urlContentResponse.rawContentMd5 != '""' and\
475  itemObject.rawContentMd5 == urlContentResponse.rawContentMd5) or\
476  (urlContentResponse.contentURLMd5 != '' and urlContentResponse.contentURLMd5 != '""' and\
477  itemObject.contentURLMd5 == urlContentResponse.contentURLMd5)):
478  # Is item that exists is not processed or migrated from another host and candidate is not
479  # or status greater
480  if 'Batch_Id' not in itemObject.dbFields:
481  itemObject.dbFields['Batch_Id'] = 0
482  if 'Batch_Id' not in urlContentResponse.dbFields:
483  urlContentResponse.dbFields['Batch_Id'] = 0
484  # Is both items have crawled and processed but some one is older by UDate (experimental).
485  itemObjectUDate = 0
486  urlContentResponseUDate = 0
487  if 'UDate' in itemObject.dbFields:
488  itemObjectUDate = self.getUnixTimeFromString(itemObject.dbFields['UDate'])
489  if 'UDate' in urlContentResponse.dbFields:
490  urlContentResponseUDate = self.getUnixTimeFromString(urlContentResponse.dbFields['UDate'])
491  # logger.debug("urlContentResponseUDate: %s, itemObjectUDate: %s",
492  # str(urlContentResponseUDate), str(itemObjectUDate))
493  if (int(itemObject.dbFields['Batch_Id']) == 0 and int(urlContentResponse.dbFields['Batch_Id']) != 0) or\
494  (urlContentResponse.dbFields['Status'] > itemObject.dbFields['Status']) or\
495  (urlContentResponseUDate > itemObjectUDate):
496  # Item already exists and candidate is better and replaces it
497  logger.debug("Already exists itemObject:\n" + Utils.varDump(itemObject.dbFields) + \
498  "\nand replaced urlContentResponse:\n" + Utils.varDump(urlContentResponse.dbFields))
499  urlContentResponse.host = clientResponseItem.host
500  # itemObject = urlContentResponse
501  newItemsList[i] = urlContentResponse
502  else:
503  # Item already exists and better than candidate
504  logger.debug("Already exists urlContentResponse:\n" + Utils.varDump(urlContentResponse))
505  urlContentResponse = None
506  break
507  # Add content to list
508  if urlContentResponse is not None:
509  urlContentResponse.host = clientResponseItem.host
510  newItemsList.append(urlContentResponse)
511  logger.debug("Added urlContentResponse:\n" + Utils.varDump(urlContentResponse))
512  else:
513  logger.debug("Rejected urlContentResponse")
514  else:
515  logger.debug("Empty contents lists in urlContentResponse:\n" + Utils.varDump(urlContentResponse))
516  else:
517  # Object is null or wrong type
518  logger.error("Wrong type of clientResponseItem.itemObject\n" + Utils.varDump(itemsList))
519 
520  return newItemsList
521 
522 
523 
524  # #Merge results of operation SITE_STATUS
525  #
526  # @param mergedSite new merged Site object
527  # @param currentSite current Site object from itemObject item of response results
528  # @return new merged Site object
529  def mergeResultsSiteStatus(self, mergedSite, clientResponseItem):
530  currentSite = clientResponseItem.itemObject
531 
532  if isinstance(currentSite, EventObjects.Site):
533  # If this is first time call for results list init. with this Site object
534  if mergedSite is None:
535  mergedSite = currentSite
536  else:
537  logger.debug("Merge with Site object:\n" + Utils.varDump(currentSite))
538  # Merge site's data
539  mergedSite = self.mergeResultsSiteFields(mergedSite, currentSite)
540 
541  mergedSite.host = clientResponseItem.host
542  else:
543  # Object is null or wrong type
544  logger.error("Wrong type of currentSite object: " + str(type(currentSite)) + ", Site expected\n" + \
545  Utils.varDump(currentSite))
546 
547  return mergedSite
548 
549 
550 
551  # #Merge Site object fields
552  #
553  # @param siteToMerge Site object that is used as base
554  # @param siteMergeWith Site object that is merged with base
555  # @return merged siteToMerge object
556  def mergeResultsSiteFields(self, siteToMerge, siteMergeWith):
557  # Set merged values for correspondent fields of the Site objects
558  siteToMerge.resources += siteMergeWith.resources
559  siteToMerge.contents += siteMergeWith.contents
560  siteToMerge.collectedURLs += siteMergeWith.collectedURLs
561  siteToMerge.newURLs += siteMergeWith.newURLs
562  siteToMerge.deletedURLs += siteMergeWith.deletedURLs
563  siteToMerge.iterations = max([siteToMerge.iterations, siteMergeWith.iterations])
564  siteToMerge.errors += siteMergeWith.errors
565  siteToMerge.errorMask |= siteMergeWith.errorMask
566  siteToMerge.size += siteMergeWith.size
567  siteToMerge.avgSpeed = min([siteToMerge.avgSpeed, siteMergeWith.avgSpeed])
568  siteToMerge.avgSpeedCounter = min([siteToMerge.avgSpeedCounter, siteMergeWith.avgSpeedCounter])
569 
570  if self.configVars[self.CONFIG_DRCE_NODES] > 1:
571  siteToMerge.maxURLs += siteMergeWith.maxURLs
572  siteToMerge.maxResources += siteMergeWith.maxResources
573  siteToMerge.maxErrors += siteMergeWith.maxErrors
574 
575  return siteToMerge
576 
577 
578 
579  # #Merge results of operation SITE_FIND
580  #
581  # @param newItemsList new itemsList for new merged itemObject
582  # @param itemsList current items list from current itemObject of node response results
583  # @return new itemsList for merged itemObject
584  def mergeResultsSiteFind(self, newItemsList, clientResponseItem):
585  itemsList = clientResponseItem.itemObject
586 
587  # If this is first item in results list, init itemObject with empty list
588  if newItemsList is None:
589  newItemsList = []
590 
591  if isinstance(itemsList, list):
592  # For each Site object in the clientResponseItem.itemObject
593  for site in itemsList:
594  # Check presence of this site in the newItemsList
595  present = False
596  for addedSite in newItemsList:
597  if addedSite.id == site.id:
598  # Merge site's data
599  addedSite = self.mergeResultsSiteFields(addedSite, site)
600  addedSite.host = clientResponseItem.host
601  present = True
602  break
603  if not present:
604  # Add content to list
605  site.host = clientResponseItem.host
606  newItemsList.append(site)
607  logger.debug("Added Site:\n" + Utils.varDump(site))
608  else:
609  # Object is null or wrong type
610  logger.error("Wrong type of clientResponseItem.itemObject\n" + Utils.varDump(itemsList))
611 
612  return newItemsList
613 
614 
615 
616  # #Merge results of operation URL_FETCH
617  #
618  # @param newItemsList new itemsList for new merged itemObject
619  # @param itemsList current items list from current itemObject of node response results
620  # @return new itemsList for merged itemObject
621  def mergeResultsURLFetch(self, newItemsList, clientResponseItem):
622  itemsList = clientResponseItem.itemObject
623 
624  logger.debug("Merging object of URLFEtch, host: %s, items: %s", str(clientResponseItem.host), str(len(itemsList)))
625  # If this is first item in results list, init itemObject with empty list
626  if newItemsList is None:
627  newItemsList = []
628 
629  replacements = 0
630  insertions = 0
631 
632  if isinstance(itemsList, list):
633  # For each URL object in the clientResponseItem.itemObject
634  for url in itemsList:
635  # Check presence of this url in the newItemsList
636  present = False
637  for i, addedURL in enumerate(newItemsList):
638  if addedURL.urlMd5 == url.urlMd5:
639  logger.debug("URL found in list: %s", url.urlMd5)
640  # For case if both are valid - to get newer (experimental)
641  addedURLUDate = self.getUnixTimeFromString(addedURL.UDate)
642  urUDate = self.getUnixTimeFromString(url.UDate)
643  # logger.debug("addedURLUDate: %s, urUDate: %s", str(addedURLUDate), str(urUDate))
644  # Replace with the URL object with higher status value and not migrated from another host
645  if (addedURL.status <= url.status and (url.crawled > 0 and url.processed > 0 and url.batchId > 0)) or\
646  (addedURL.status == url.status and addedURL.crawled == 0 and url.crawled > 0) or\
647  (addedURL.status == url.status and addedURL.crawled > 0 and url.crawled > 0 and urUDate > addedURLUDate):
648  logger.debug("URL replaced in list with best fields values: %s, old.status=%s, new.status=%s, " + \
649  "old.crawled=%s, new.crawled=%s, old.processed=%s, new.processed=%s, old.batchId=%s, " + \
650  "new.batchId=%s, old.UDate=%s, new.UDate=%s",
651  url.urlMd5, str(addedURL.status), str(url.status), str(addedURL.crawled), str(url.crawled),
652  str(addedURL.processed), str(url.processed), str(addedURL.batchId), str(url.batchId),
653  str(addedURLUDate), str(urUDate))
654  url.host = clientResponseItem.host
655  newItemsList[i] = url
656  replacements += 1
657  else:
658  logger.debug("URL not replaced cause conditions not matched, old url:\n%s\ncandidate url:\n%s",
659  str(addedURL), str(url))
660  present = True
661  break
662  if not present:
663  # Add content to list
664  url.host = clientResponseItem.host
665  newItemsList.append(url)
666  logger.debug("Added URL: %s", str(url.urlMd5))
667  insertions += 1
668  else:
669  # Object is null or wrong type
670  logger.error("Wrong type of clientResponseItem.itemObject\n%s", Utils.varDump(itemsList))
671 
672  logger.debug("Merging object of URLFEtch finished, replacements: %s, insertions: %s", str(replacements),
673  str(insertions))
674 
675  return newItemsList
676 
677 
678 
679  # #Merge results of operation for GeneralResponse item object
680  #
681  # @param mergedGeneralResponse new merged GeneralResponse object
682  # @param currentGeneralResponse current GeneralResponse object from itemObject item of response results
683  # @return new merged GeneralResponse object
684  def mergeResultsGeneralResponse(self, mergedGeneralResponse, clientResponseItem):
685  currentGeneralResponse = clientResponseItem.itemObject
686 
687  if isinstance(currentGeneralResponse, dtm.EventObjects.GeneralResponse):
688  if mergedGeneralResponse is None:
689  # If this is first time call for results list init. with this Site object
690  mergedGeneralResponse = currentGeneralResponse
691  mergedGeneralResponse.host = clientResponseItem.host
692  logger.debug("Merge init GeneralResponse object:\n" + Utils.varDump(currentGeneralResponse))
693  else:
694  logger.debug("Merge with GeneralResponse object:\n" + Utils.varDump(currentGeneralResponse))
695  # Merge fields values
696  mergedGeneralResponse.errorCode = str(mergedGeneralResponse.errorCode) + ";" + \
697  str(currentGeneralResponse.errorCode)
698  mergedGeneralResponse.errorMessage = str(mergedGeneralResponse.errorMessage) + ";" + \
699  str(currentGeneralResponse.errorMessage)
700  mergedGeneralResponse.statuses.extend(currentGeneralResponse.statuses)
701  else:
702  # Object is null or wrong type
703  logger.error("Wrong type of currentGeneralResponse object: " + str(type(currentGeneralResponse)) + \
704  ", dtm.GeneralResponse expected\n" + Utils.varDump(currentGeneralResponse))
705 
706  return mergedGeneralResponse
707 
708 
709 
710  # #Merge results of operation BATCH
711  #
712  # @param newItemsList new itemsList for new merged itemObject
713  # @param itemsList current items list from current itemObject of node response results
714  # @return new itemsList for merged itemObject
715  def mergeResultsBatch(self, newItemsList, clientResponseItem):
716  itemsList = clientResponseItem.itemObject
717 
718  # If this is first item in results list, init itemObject with empty list
719  if newItemsList is None:
720  newItemsList = []
721 
722  if isinstance(itemsList, list):
723  # For each URLContentResponse object in the clientResponseItem.itemObject
724  for urlContentResponse in itemsList:
725  # If raw or processed content exists
726  # if len(urlContentResponse.rawContents) > 0 or len(urlContentResponse.processedContents) > 0:
727  # Add content to list
728  if urlContentResponse is not None:
729  urlContentResponse.host = clientResponseItem.host
730  newItemsList.append(urlContentResponse)
731  logger.debug("Added urlContentResponse:\n" + Utils.varDump(urlContentResponse))
732  else:
733  logger.debug("Rejected urlContentResponse")
734  # else:
735  # logger.debug("Empty contents lists in urlContentResponse:\n" + Utils.varDump(urlContentResponse))
736  else:
737  # Object is null or wrong type
738  logger.error("Wrong type of clientResponseItem.itemObject\n" + Utils.varDump(itemsList))
739 
740  return newItemsList
741 
742 
743  # #Converts a string representation of a time to Unix time
744  #
745  # @param buf - buffer to convert
746  # @param dateFormat - format of a date representation
747  # @param valueType - a type of a value to return, 0 - integer, another - floating point
748  # @return returns a Unix time number of seconds or zero if wrong convert
749  def getUnixTimeFromString(self, buf, dateFormat='%Y-%m-%d %H:%M:%S', valueType=0):
750  ret = 0
751 
752  try:
753  ret = time.mktime(time.strptime(str(buf), dateFormat))
754  if valueType == 0:
755  ret = int(ret)
756  except Exception as err:
757  logger.error("Error get date from: `%s` with format: `%s` : %s", str(buf), dateFormat, str(err))
758 
759  return ret
760 
def reply(self, event, reply_event)
wrapper for sending event in reply for event
def sortURLContentResults(self, itemObject, criterion)
def mergeResultsSiteStatus(self, mergedSite, clientResponseItem)
def mergeResultsSiteFind(self, newItemsList, clientResponseItem)
def __init__(self, configParser, connectBuilderLight)
def mergeResultsBatch(self, newItemsList, clientResponseItem)
GeneralResponse event object, represents general state response for multipurpose usage.
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
def mergeResultsURLContent(self, newItemsList, clientResponseItem)
def addConnection(self, name, connection)
This is app base class for management server connection end-points and parallel transport messages pr...
def mergeResultsGeneralResponse(self, mergedGeneralResponse, clientResponseItem)
def send(self, connect_name, event)
send event
def mergeResultsURLFetch(self, newItemsList, clientResponseItem)
def mergeResultsData(self, eventType, eventObj, eventCookie)
def getUnixTimeFromString(self, buf, dateFormat='%Y-%m-%d %H:%M:%S', valueType=0)
def mergeResultsSiteFields(self, siteToMerge, siteMergeWith)