2 HCE project, Python bindings, Distributed Crawler application. 3 BatchTasksManagerRealTime object and related classes definitions. 6 @author bgv bgv.hce@gmail.com 7 @link: http://hierarchical-cluster-engine.com/ 8 @copyright: Copyright © 2014 IOIX Ukraine 9 @license: http://hierarchical-cluster-engine.com/license/ 21 import cPickle
as pickle
28 from dc
import EventObjects
34 from drce.DRCEManager import ConnectionTimeout, TransportInternalErr, CommandExecutorErr
44 lock = threading.Lock()
53 DRCE_REDUCER_TTL = 3000000
54 REQUEST_ERROR_OBJECT_TYPE = 1
55 REQUEST_ERROR_URLS_COUNT = 2
56 REQUEST_ERROR_THREADS_NUMBER_EXCEEDED = 3
57 CONFIG_DRCE_REQUEST_ROUTING_DEFAULT = 1
58 CONFIG_BATCH_MAX_ITERATIONS_DEFAULT = 2
61 CONFIG_SERVER =
"server" 63 CONFIG_DRCE_STARTER_NAME =
"DRCEStarterName" 64 CONFIG_DRCE_HOST =
"DRCEHost" 65 CONFIG_DRCE_PORT =
"DRCEPort" 66 CONFIG_DRCE_TIMEOUT =
"DRCETimeout" 67 CONFIG_DRCE_CRAWLER_APP_NAME =
"DRCECrawlerAppName" 68 CONFIG_BATCH_MAX_TIME =
"BatchMaxExecutionTime" 69 CONFIG_BATCH_MAX_URLS =
"BatchMaxURLs" 70 CONFIG_MAX_THREADS =
"MaxThreads" 71 CONFIG_POLLING_TIMEOUT =
"PollingTimeout" 72 CONFIG_DRCE_REQUEST_ROUTING =
"DRCERequestRouting" 73 CONFIG_BATCH_MAX_ITERATIONS =
"BatchMaxIterations" 75 REAL_TIME_CRAWL_THREAD_NAME_PREFIX =
'RtCrawl_' 83 def __init__(self, configParser, connectionBuilderLight=None):
84 super(BatchTasksManagerRealTime, self).
__init__()
87 if connectionBuilderLight
is None:
91 className = self.__class__.__name__
95 serverConnection = connectionBuilderLight.build(TRANSPORT_CONSTS.SERVER_CONNECT, self.
serverName)
164 main_thread = threading.currentThread()
165 for t
in threading.enumerate():
168 if self.
statFields[DC_CONSTS.BATCHES_REALTIME_THREADS_NAME] > n:
184 clientResponseObj.errorMessage =
"Wrong requested object type " + str(len(event.eventObj.items)) + \
188 logger.info(
"Single thread processing started")
197 logger.info(
"Forking new thread")
198 self.
updateStatField(DC_CONSTS.BATCHES_REALTIME_THREADS_CREATED_COUNTER_NAME, 1,
200 t1 = threading.Thread(target=self.
forkBatch, args=(logging, event,))
202 str(self.
statFields[DC_CONSTS.BATCHES_REALTIME_THREADS_CREATED_COUNTER_NAME]))
204 logger.info(
"New thread forked")
213 clientResponseObj.errorMessage =
"Service overloaded, " + \
214 str(self.
statFields[DC_CONSTS.BATCHES_REALTIME_THREADS_NAME]) +
" workers." 215 logger.error(clientResponseObj.errorMessage)
221 clientResponseObj.errorMessage =
"STUB fake error response" 222 logger.info(clientResponseObj.errorMessage)
227 clientResponseObj.errorMessage =
"Wrong requested object type " +
type(event.eventObj) +
", Batch expected." 228 logger.error(clientResponseObj.errorMessage)
248 logger = loggingObj.getLogger(DC_CONSTS.LOGGER_NAME)
251 logger.info(
"THREAD_STARTED")
252 logger.debug(
"event:\n%s", Utils.varDump(event))
256 event.eventObj.crawlerType = EventObjects.Batch.TYPE_REAL_TIME_CRAWLER
257 event.eventObj.dbMode = EventObjects.Batch.DB_MODE_R
266 logger.debug(
"ClientResponseObj object:\n" + Utils.varDump(clientResponseObj))
270 str(self.
totalTime / float(1 + self.
statFields[DC_CONSTS.BATCHES_CRAWL_COUNTER_TOTAL_NAME])),
274 logger.info(
"THREAD_FINISHED")
275 logger.debug(
"clientResponseObj:\n%s", Utils.varDump(clientResponseObj))
277 except Exception
as err:
278 msg =
"Thread exception:" + str(err)
283 clientResponseObj.errorMessage = msg
287 msg =
"Unknown thread exception!" 292 clientResponseObj.errorMessage = msg
308 replyEvent = self.
eventBuilder.build(DC_CONSTS.EVENT_TYPES.BATCH_RESPONSE, clientResponseObj)
310 self.
reply(clientRequestEvent, replyEvent)
311 logger.info(
"Response to client sent")
323 taskId = ctypes.c_uint32(zlib.crc32(idGenerator.get_connection_uid(), int(time.time()))).value
330 if eventObj.maxExecutionTime == 0:
333 mt = eventObj.maxExecutionTime
336 logger.debug(
"Custom max DRCE task execution set: %s", str(mt))
337 taskExecuteStruct.input = pickle.dumps(eventObj)
338 taskExecuteStruct.session =
Session(Session.TMODE_SYNC, 0, int(mt) * 1000)
340 logger.debug(
"DRCE taskExecuteStruct:\n" + Utils.varDump(taskExecuteStruct))
345 taskExecuteRequest.data = taskExecuteStruct
347 if len(eventObj.items) < 2:
349 taskExecuteRequest.route = DC_CONSTS.DRCE_REQUEST_ROUTING_RESOURCE_USAGE
351 taskExecuteRequest.route = DC_CONSTS.DRCE_REQUEST_ROUTING_ROUND_ROBIN
353 taskExecuteRequest.route = DC_CONSTS.DRCE_REQUEST_ROUTING_MULTICAST
355 taskExecuteRequest.route = DC_CONSTS.DRCE_REQUEST_ROUTING_RND
357 logger.debug(
"DRCE taskExecuteRequest:\n" + Utils.varDump(taskExecuteRequest))
359 return taskExecuteRequest
368 logger.info(
"Sending sync task id:" + str(taskExecuteRequest.id) +
" to DRCE router!")
371 logger.info(
"Received response on sync task from DRCE router!")
372 logger.debug(
"Response: %s", Utils.varDump(response))
379 clientResponse.errorCode = EventObjects.ClientResponse.STATUS_ERROR_NONE
380 clientResponse.errorMessage =
"Response error, None returned from DRCE, possible timeout " + \
382 logger.error(clientResponse.errorMessage)
387 if len(response.items) == 0:
388 clientResponse.errorCode = EventObjects.ClientResponse.STATUS_ERROR_EMPTY_LIST
389 clientResponse.errorMessage =
"Response error, empty list returned from DRCE, possible no one node in cluster!" 390 logger.error(clientResponse.errorMessage)
395 for item
in response.items:
399 if item.error_code > 0
or item.exit_status > 0:
400 clientResponseItem.errorCode = clientResponseItem.STATUS_ERROR_DRCE
401 clientResponseItem.errorMessage =
"error_message=" + item.error_message + \
402 ", error_code=" + str(item.error_code) + \
403 ", exit_status=" + str(item.exit_status) + \
404 ", stderror=" + str(item.stderror)
405 logger.error(clientResponseItem.errorMessage)
412 clientResponseItem.itemObject = pickle.loads(item.stdout)
413 if clientResponseItem.itemObject
is not None and isinstance(clientResponseItem.itemObject, list):
414 urlContents = len(clientResponseItem.itemObject)
416 self.
updateStatField(DC_CONSTS.BATCHES_CRAWL_COUNTER_URLS_NAME, urlContents,
419 except Exception
as e:
420 clientResponseItem.errorCode = EventObjects.ClientResponseItem.STATUS_ERROR_RESTORE_OBJECT
421 clientResponseItem.errorMessage = EventObjects.ClientResponseItem.MSG_ERROR_RESTORE_OBJECT +
"\n" + \
422 str(e.message) +
"\nstdout=" + str(item.stdout) + \
423 ", stderror=" + str(item.stderror)
424 logger.error(clientResponseItem.errorMessage)
429 clientResponseItem.id = item.id
430 clientResponseItem.host = item.host
431 clientResponseItem.port = item.port
432 clientResponseItem.node = item.node
433 clientResponseItem.time = item.time
435 clientResponse.itemsList.append(clientResponseItem)
437 return clientResponse
456 except (ConnectionTimeout, TransportInternalErr, CommandExecutorErr)
as err:
458 logger.error(
"DRCE router send error : " + str(err.message))
460 logger.info(
"DRCE router sent!")
463 drceManager.clear_host()
def reply(self, event, reply_event)
wrapper for sending event in reply for event
def __init__(self, configParser, connectionBuilderLight=None)
string CONFIG_BATCH_MAX_URLS
int CONFIG_DRCE_REQUEST_ROUTING_DEFAULT
int REQUEST_ERROR_THREADS_NUMBER_EXCEEDED
string CONFIG_POLLING_TIMEOUT
string CONFIG_DRCE_TIMEOUT
int CONFIG_BATCH_MAX_ITERATIONS_DEFAULT
def updateStatField(self, field_name, value, operation=STAT_FIELDS_OPERATION_ADD)
update values of stat field - default sum
wrapper for TaskExecuteStruct
string CONFIG_DRCE_CRAWLER_APP_NAME
string POLL_TIMEOUT_CONFIG_VAR_NAME
string CONFIG_BATCH_MAX_TIME
def setEventHandler(self, eventType, handler)
set event handler rewrite the current handler for eventType
def sendClientResponse(self, clientRequestEvent, clientResponseObj)
def addConnection(self, name, connection)
def prepareDRCERequest(self, eventObj)
This is app base class for management server connection end-points and parallel transport messages pr...
string REAL_TIME_CRAWL_THREAD_NAME_PREFIX
int STAT_FIELDS_OPERATION_SET
wrapper for Session fields array of execute task
int STAT_FIELDS_OPERATION_ADD
UIDGenerator is used to generate unique message id.
string CONFIG_BATCH_MAX_ITERATIONS
Class hides routines of bulding connection objects.
statFields
stat fields container
string CONFIG_DRCE_REQUEST_ROUTING
def processDRCERequest(self, taskExecuteRequest)
def sendToDRCERouter(self, request)
def on_poll_timeout(self)
def forkBatch(self, loggingObj, event)
string CONFIG_DRCE_STARTER_NAME
int REQUEST_ERROR_URLS_COUNT
int STAT_FIELDS_OPERATION_SUB
int REQUEST_ERROR_OBJECT_TYPE
string CONFIG_MAX_THREADS
IDGenerator is used to generate unique id for connections.
Convertor which used to convert Task*Reques to json and TaskResponse from json.
int STAT_FIELDS_OPERATION_INIT
def onEventsHandler(self, event)