6 @link: http://hierarchical-cluster-engine.com/ 7 @copyright: Copyright © 2013-2014 IOIX Ukraine 8 @license: http://hierarchical-cluster-engine.com/license/ 20 from dc
import EventObjects
31 CONFIG_SECTION =
"ClientInterfaceService" 32 CONFIG_SERVER_HOST =
"serverHost" 33 CONFIG_SERVER_PORT =
"serverPort" 34 CONFIG_SITES_MANAGER =
"clientSitesManager" 35 SERVER_CONNECTION_NAME =
"server" 36 CONNECTION_PREFIX =
"Connection" 37 CONFIG_BATCH_TASKS_MANAGER_REALTIME =
"clientBatchTasksManagerRealTime" 38 CONFIG_DRCE_NODES =
"DRCENodes" 45 def __init__(self, configParser, connectBuilderLight):
49 BaseServerManager.__init__(self)
53 server = serverHost +
":" + str(serverPort)
60 except ConfigParser.NoOptionError:
63 serverConnection = connectBuilderLight.build(consts.SERVER_CONNECT, server, consts.TCP_TYPE)
64 sitesManagerConnection = connectBuilderLight.build(consts.CLIENT_CONNECT, self.
sitesManager)
65 batchTasksManagerRealTimeConnection = connectBuilderLight.build(consts.CLIENT_CONNECT,
71 batchTasksManagerRealTimeConnection)
147 logger.debug(
"Received event: " + Utils.varDump(event))
150 except KeyError
as err:
151 logger.error(err.message)
152 except Exception, err:
153 logger.error(
"Error `%s`", str(err))
162 logger.debug(
"Received event: " + Utils.varDump(event))
165 except KeyError
as err:
166 logger.error(err.message)
167 except Exception, err:
168 logger.error(
"Error `%s`", str(err))
178 if event.cookie
is not None and isinstance(event.cookie, dict)
and\
179 DC_CONSTANTS.MERGE_PARAM_NAME
in event.cookie
and bool(event.cookie[DC_CONSTANTS.MERGE_PARAM_NAME])
is False:
180 logger.debug(
"No merge results specified in Event.cookie!")
182 logger.debug(
"Results merge try by default.")
183 event.eventObj = self.
mergeResultsData(event.eventType, event.eventObj, event.cookie)
184 self.
reply(request_event, event)
185 logger.debug(
"Results sent to DCC.")
187 except KeyError
as err:
188 logger.error(err.message)
196 event.eventObj =
None 226 if len(eventObj.itemsList) > 1:
236 logger.debug(
"Merging, response items: %s", str(len(eventObj.itemsList)))
239 for clientResponseItem
in eventObj.itemsList:
241 if clientResponseItem.itemObject
is not None:
242 mergedCounter = mergedCounter + 1
243 logger.debug(
"clientResponseItem:\n" + Utils.varDump(clientResponseItem))
244 newHost += clientResponseItem.host +
";" 245 newPort += str(clientResponseItem.port) +
";" 246 newNode += clientResponseItem.node +
";" 247 newTime += str(clientResponseItem.time) +
";" 248 newErrorMessage += clientResponseItem.errorMessage
249 newErrorCode += str(clientResponseItem.errorCode) +
";" 250 if eventType == EVENT_TYPES.SITE_FIND_RESPONSE:
253 elif eventType == EVENT_TYPES.URL_FETCH_RESPONSE
or eventType == EVENT_TYPES.URL_STATUS_RESPONSE:
256 elif eventType == EVENT_TYPES.URL_CONTENT_RESPONSE:
259 elif eventType == EVENT_TYPES.SITE_STATUS_RESPONSE:
266 elif eventType == EVENT_TYPES.BATCH_RESPONSE:
270 l = {EVENT_TYPES.SITE_NEW_RESPONSE, EVENT_TYPES.SITE_UPDATE_RESPONSE, EVENT_TYPES.SITE_DELETE_RESPONSE,
271 EVENT_TYPES.SITE_CLEANUP_RESPONSE, EVENT_TYPES.URL_NEW_RESPONSE, EVENT_TYPES.URL_UPDATE_RESPONSE,
272 EVENT_TYPES.URL_DELETE_RESPONSE, EVENT_TYPES.URL_CLEANUP_RESPONSE, EVENT_TYPES.URL_UPDATE_RESPONSE,
273 EVENT_TYPES.URL_DELETE_RESPONSE, EVENT_TYPES.URL_CLEANUP_RESPONSE}
279 logger.error(
"The clientResponseItem.itemObject is None!")
281 if newItemObject
is not None:
282 eventObj.itemsList = [eventObj.itemsList[0]]
283 eventObj.itemsList[0].errorCode = newErrorCode.rstrip(
";")
284 eventObj.itemsList[0].errorMessage = newErrorMessage.rstrip(
";")
285 eventObj.itemsList[0].itemObject = newItemObject
286 eventObj.itemsList[0].host = newHost.rstrip(
";")
287 eventObj.itemsList[0].port = newPort.rstrip(
";")
288 eventObj.itemsList[0].node = newNode.rstrip(
";")
289 eventObj.itemsList[0].time = newTime.rstrip(
";")
290 if isinstance(newItemObject, list):
291 itemsNumber = len(newItemObject)
294 logger.debug(
"Merged with " + str(mergedCounter) +
" response objects, " + \
295 str(itemsNumber) +
" merged items.")
297 if eventType == EVENT_TYPES.URL_CONTENT_RESPONSE
and eventCookie
is not None\
298 and isinstance(eventCookie, dict)
and EventObjects.URLFetch.CRITERION_ORDER
in eventCookie
and\
299 len(eventCookie[EventObjects.URLFetch.CRITERION_ORDER]) > 0
and\
300 eventCookie[EventObjects.URLFetch.CRITERION_ORDER][0].strip() !=
'':
303 eventCookie[EventObjects.URLFetch.CRITERION_ORDER][0])
304 logger.debug(
"URL_CONTENT results sorted by the: %s",
305 str(eventCookie[EventObjects.URLFetch.CRITERION_ORDER][0]))
307 elif eventType == EVENT_TYPES.URL_FETCH_RESPONSE
and eventCookie
is not None\
308 and isinstance(eventCookie, dict)
and EventObjects.URLFetch.CRITERION_ORDER
in eventCookie
and\
309 len(eventCookie[EventObjects.URLFetch.CRITERION_ORDER]) > 0
and\
310 eventCookie[EventObjects.URLFetch.CRITERION_ORDER][0].strip() !=
'':
313 eventCookie[EventObjects.URLFetch.CRITERION_ORDER][0])
314 logger.debug(
"URL_CONTENT results sorted by the: %s",
315 str(eventCookie[EventObjects.URLFetch.CRITERION_ORDER][0]))
317 logger.debug(
"Sort conditions are not satisfied, results not sorted.")
319 logger.debug(
"No items collected while merge procedure, merge skipped.")
322 logger.error(
"Wrong eventObj.itemsList[0] type " + str(
type(eventObj.itemsList[0])) + \
323 " expected EventObjects.ClientResponseItem\n" + Utils.varDump(eventObj.itemsList[0]))
325 logger.error(
"Wrong eventObj type " + str(
type(eventObj)) +
" expected EventObjects.ClientResponse\n" + \
326 Utils.varDump(eventObj))
341 crits = criterion.split(
',')
343 crits = criterion.split(
' ')
345 crits = (
'CDate',
False)
346 elif len(crits) == 1:
347 crits = (crits[0],
False)
349 if crits[1] ==
'ASC':
354 if crits[0] ==
'PDate':
355 def orderKey(itemObject):
356 if itemObject.dbFields[
'PDate']
is not None:
357 return parse(itemObject.dbFields[
'PDate']).strftime(
'%s')
360 elif crits[0] ==
'CDate':
361 def orderKey(itemObject):
362 return parse(itemObject.dbFields[
'CDate']).strftime(
'%s')
363 elif crits[0] ==
'UDate':
364 def orderKey(itemObject):
365 return parse(itemObject.dbFields[
'UDate']).strftime(
'%s')
366 elif crits[0] ==
'TcDate':
367 def orderKey(itemObject):
368 return parse(itemObject.dbFields[
'TcDate']).strftime(
'%s')
369 elif crits[0] ==
'Size':
370 def orderKey(itemObject):
371 return itemObject.dbFields[
'Size']
372 elif crits[0] ==
'Depth':
373 def orderKey(itemObject):
374 return itemObject.dbFields[
'Depth']
375 elif crits[0] ==
'TagsCount':
376 def orderKey(itemObject):
377 return itemObject.dbFields[
'TagsCount']
378 elif crits[0] ==
'ContentURLMd5':
379 def orderKey(itemObject):
380 return itemObject.dbFields[
'ContentURLMd5']
382 def orderKey(itemObject):
383 return itemObject.dbFields[
'CDate']
385 ret = sorted(ret, key=orderKey, reverse=crits[1])
387 except Exception, err:
388 logger.error(
"Exception: '%s', criterion: '%s'", str(err), str(criterion))
402 crits = criterion.split(
',')
404 crits = criterion.split(
' ')
406 crits = (
'CDate',
False)
407 elif len(crits) == 1:
408 crits = (crits[0],
False)
410 if crits[1] ==
'ASC':
415 if crits[0] ==
'PDate':
416 def orderKey(itemObject):
417 return parse(itemObject.pDate).strftime(
'%s')
418 elif crits[0] ==
'CDate':
419 def orderKey(itemObject):
420 return parse(itemObject.CDate).strftime(
'%s')
421 elif crits[0] ==
'UDate':
422 def orderKey(itemObject):
423 return parse(itemObject.UDate).strftime(
'%s')
424 elif crits[0] ==
'TcDate':
425 def orderKey(itemObject):
426 return parse(itemObject.tcDate).strftime(
'%s')
427 elif crits[0] ==
'Size':
428 def orderKey(itemObject):
429 return itemObject.size
430 elif crits[0] ==
'Depth':
431 def orderKey(itemObject):
432 return itemObject.depth
433 elif crits[0] ==
'TagsCount':
434 def orderKey(itemObject):
435 return itemObject.tagsCount
436 elif crits[0] ==
'ContentURLMd5':
437 def orderKey(itemObject):
438 return itemObject.contentURLMd5
440 def orderKey(itemObject):
441 return itemObject.CDate
443 ret = sorted(ret, key=orderKey, reverse=crits[1])
444 except Exception
as err:
445 logger.error(
"Exception: %s", str(err))
457 itemsList = clientResponseItem.itemObject
460 if newItemsList
is None:
463 if isinstance(itemsList, list):
465 for urlContentResponse
in itemsList:
467 if len(urlContentResponse.rawContents) > 0
or len(urlContentResponse.processedContents) > 0:
469 for i, itemObject
in enumerate(newItemsList):
472 if itemObject.siteId == urlContentResponse.siteId
and\
473 (itemObject.urlMd5 == urlContentResponse.urlMd5
or\
474 (urlContentResponse.rawContentMd5 !=
'' and urlContentResponse.rawContentMd5 !=
'""' and\
475 itemObject.rawContentMd5 == urlContentResponse.rawContentMd5)
or\
476 (urlContentResponse.contentURLMd5 !=
'' and urlContentResponse.contentURLMd5 !=
'""' and\
477 itemObject.contentURLMd5 == urlContentResponse.contentURLMd5)):
480 if 'Batch_Id' not in itemObject.dbFields:
481 itemObject.dbFields[
'Batch_Id'] = 0
482 if 'Batch_Id' not in urlContentResponse.dbFields:
483 urlContentResponse.dbFields[
'Batch_Id'] = 0
486 urlContentResponseUDate = 0
487 if 'UDate' in itemObject.dbFields:
489 if 'UDate' in urlContentResponse.dbFields:
493 if (int(itemObject.dbFields[
'Batch_Id']) == 0
and int(urlContentResponse.dbFields[
'Batch_Id']) != 0)
or\
494 (urlContentResponse.dbFields[
'Status'] > itemObject.dbFields[
'Status'])
or\
495 (urlContentResponseUDate > itemObjectUDate):
497 logger.debug(
"Already exists itemObject:\n" + Utils.varDump(itemObject.dbFields) + \
498 "\nand replaced urlContentResponse:\n" + Utils.varDump(urlContentResponse.dbFields))
499 urlContentResponse.host = clientResponseItem.host
501 newItemsList[i] = urlContentResponse
504 logger.debug(
"Already exists urlContentResponse:\n" + Utils.varDump(urlContentResponse))
505 urlContentResponse =
None 508 if urlContentResponse
is not None:
509 urlContentResponse.host = clientResponseItem.host
510 newItemsList.append(urlContentResponse)
511 logger.debug(
"Added urlContentResponse:\n" + Utils.varDump(urlContentResponse))
513 logger.debug(
"Rejected urlContentResponse")
515 logger.debug(
"Empty contents lists in urlContentResponse:\n" + Utils.varDump(urlContentResponse))
518 logger.error(
"Wrong type of clientResponseItem.itemObject\n" + Utils.varDump(itemsList))
530 currentSite = clientResponseItem.itemObject
534 if mergedSite
is None:
535 mergedSite = currentSite
537 logger.debug(
"Merge with Site object:\n" + Utils.varDump(currentSite))
541 mergedSite.host = clientResponseItem.host
544 logger.error(
"Wrong type of currentSite object: " + str(
type(currentSite)) +
", Site expected\n" + \
545 Utils.varDump(currentSite))
558 siteToMerge.resources += siteMergeWith.resources
559 siteToMerge.contents += siteMergeWith.contents
560 siteToMerge.collectedURLs += siteMergeWith.collectedURLs
561 siteToMerge.newURLs += siteMergeWith.newURLs
562 siteToMerge.deletedURLs += siteMergeWith.deletedURLs
563 siteToMerge.iterations = max([siteToMerge.iterations, siteMergeWith.iterations])
564 siteToMerge.errors += siteMergeWith.errors
565 siteToMerge.errorMask |= siteMergeWith.errorMask
566 siteToMerge.size += siteMergeWith.size
567 siteToMerge.avgSpeed = min([siteToMerge.avgSpeed, siteMergeWith.avgSpeed])
568 siteToMerge.avgSpeedCounter = min([siteToMerge.avgSpeedCounter, siteMergeWith.avgSpeedCounter])
571 siteToMerge.maxURLs += siteMergeWith.maxURLs
572 siteToMerge.maxResources += siteMergeWith.maxResources
573 siteToMerge.maxErrors += siteMergeWith.maxErrors
585 itemsList = clientResponseItem.itemObject
588 if newItemsList
is None:
591 if isinstance(itemsList, list):
593 for site
in itemsList:
596 for addedSite
in newItemsList:
597 if addedSite.id == site.id:
600 addedSite.host = clientResponseItem.host
605 site.host = clientResponseItem.host
606 newItemsList.append(site)
607 logger.debug(
"Added Site:\n" + Utils.varDump(site))
610 logger.error(
"Wrong type of clientResponseItem.itemObject\n" + Utils.varDump(itemsList))
622 itemsList = clientResponseItem.itemObject
624 logger.debug(
"Merging object of URLFEtch, host: %s, items: %s", str(clientResponseItem.host), str(len(itemsList)))
626 if newItemsList
is None:
632 if isinstance(itemsList, list):
634 for url
in itemsList:
637 for i, addedURL
in enumerate(newItemsList):
638 if addedURL.urlMd5 == url.urlMd5:
639 logger.debug(
"URL found in list: %s", url.urlMd5)
645 if (addedURL.status <= url.status
and (url.crawled > 0
and url.processed > 0
and url.batchId > 0))
or\
646 (addedURL.status == url.status
and addedURL.crawled == 0
and url.crawled > 0)
or\
647 (addedURL.status == url.status
and addedURL.crawled > 0
and url.crawled > 0
and urUDate > addedURLUDate):
648 logger.debug(
"URL replaced in list with best fields values: %s, old.status=%s, new.status=%s, " + \
649 "old.crawled=%s, new.crawled=%s, old.processed=%s, new.processed=%s, old.batchId=%s, " + \
650 "new.batchId=%s, old.UDate=%s, new.UDate=%s",
651 url.urlMd5, str(addedURL.status), str(url.status), str(addedURL.crawled), str(url.crawled),
652 str(addedURL.processed), str(url.processed), str(addedURL.batchId), str(url.batchId),
653 str(addedURLUDate), str(urUDate))
654 url.host = clientResponseItem.host
655 newItemsList[i] = url
658 logger.debug(
"URL not replaced cause conditions not matched, old url:\n%s\ncandidate url:\n%s",
659 str(addedURL), str(url))
664 url.host = clientResponseItem.host
665 newItemsList.append(url)
666 logger.debug(
"Added URL: %s", str(url.urlMd5))
670 logger.error(
"Wrong type of clientResponseItem.itemObject\n%s", Utils.varDump(itemsList))
672 logger.debug(
"Merging object of URLFEtch finished, replacements: %s, insertions: %s", str(replacements),
685 currentGeneralResponse = clientResponseItem.itemObject
688 if mergedGeneralResponse
is None:
690 mergedGeneralResponse = currentGeneralResponse
691 mergedGeneralResponse.host = clientResponseItem.host
692 logger.debug(
"Merge init GeneralResponse object:\n" + Utils.varDump(currentGeneralResponse))
694 logger.debug(
"Merge with GeneralResponse object:\n" + Utils.varDump(currentGeneralResponse))
696 mergedGeneralResponse.errorCode = str(mergedGeneralResponse.errorCode) +
";" + \
697 str(currentGeneralResponse.errorCode)
698 mergedGeneralResponse.errorMessage = str(mergedGeneralResponse.errorMessage) +
";" + \
699 str(currentGeneralResponse.errorMessage)
700 mergedGeneralResponse.statuses.extend(currentGeneralResponse.statuses)
703 logger.error(
"Wrong type of currentGeneralResponse object: " + str(
type(currentGeneralResponse)) + \
704 ", dtm.GeneralResponse expected\n" + Utils.varDump(currentGeneralResponse))
706 return mergedGeneralResponse
716 itemsList = clientResponseItem.itemObject
719 if newItemsList
is None:
722 if isinstance(itemsList, list):
724 for urlContentResponse
in itemsList:
728 if urlContentResponse
is not None:
729 urlContentResponse.host = clientResponseItem.host
730 newItemsList.append(urlContentResponse)
731 logger.debug(
"Added urlContentResponse:\n" + Utils.varDump(urlContentResponse))
733 logger.debug(
"Rejected urlContentResponse")
738 logger.error(
"Wrong type of clientResponseItem.itemObject\n" + Utils.varDump(itemsList))
753 ret = time.mktime(time.strptime(str(buf), dateFormat))
756 except Exception
as err:
757 logger.error(
"Error get date from: `%s` with format: `%s` : %s", str(buf), dateFormat, str(err))
def reply(self, event, reply_event)
wrapper for sending event in reply for event
def onBatchTasksManagerRealTimeRoute(self, event)
def sortURLContentResults(self, itemObject, criterion)
def mergeResultsSiteStatus(self, mergedSite, clientResponseItem)
def mergeResultsSiteFind(self, newItemsList, clientResponseItem)
batchTasksManagerRealTime
string CONFIG_SERVER_HOST
def __init__(self, configParser, connectBuilderLight)
def mergeResultsBatch(self, newItemsList, clientResponseItem)
GeneralResponse event object, represents general state response for multipurpose usage.
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
def onDCClientRoute(self, event)
def mergeResultsURLContent(self, newItemsList, clientResponseItem)
def onSitesManagerRoute(self, event)
def addConnection(self, name, connection)
This is app base class for management server connection end-points and parallel transport messages pr...
string SERVER_CONNECTION_NAME
def mergeResultsGeneralResponse(self, mergedGeneralResponse, clientResponseItem)
string CONFIG_BATCH_TASKS_MANAGER_REALTIME
def sortURLFetchResults(self, itemObject, criterion)
def send(self, connect_name, event)
send event
def registerEvent(self, event)
def mergeResultsURLFetch(self, newItemsList, clientResponseItem)
def mergeResultsData(self, eventType, eventObj, eventCookie)
string CONFIG_SERVER_PORT
def getUnixTimeFromString(self, buf, dateFormat='%Y-%m-%d %H:%M:%S', valueType=0)
string CONFIG_SITES_MANAGER
def unregisterEvent(self, event)
def mergeResultsSiteFields(self, siteToMerge, siteMergeWith)
def getRequestEvent(self, event)