3 HCE project, Python bindings, Distributed Tasks Manager application. 4 Event objects definitions. 7 @author bgv bgv.hce@gmail.com 8 @link: http://hierarchical-cluster-engine.com/ 9 @copyright: Copyright © 2013-2014 IOIX Ukraine 10 @license: http://hierarchical-cluster-engine.com/license/ 18 from datetime
import datetime
28 logger = logging.getLogger(DTM_CONSTS.LOGGER_NAME)
36 FILE_ACTION_CREATE_BEFORE = 1
37 FILE_ACTION_DELETE_BEFORE = 2
38 FILE_ACTION_READ_AFTER = 4
39 FILE_ACTION_DELETE_AFTER = 8
40 FILE_ACTION_BASE64_ENCODED = 2147483648
42 STRATEGY_DATE =
"DATE" 43 STRATEGY_DATE_MAX =
"DATE_MAX" 44 STRATEGY_DATE_SHIFT =
"DATE_SHIFT" 46 STRATEGY_RAM_FREE =
"RAM_FREE" 48 STRATEGY_DISK_FREE =
"DISK_FREE" 49 STRATEGY_DISK =
"DISK" 50 STRATEGY_TIME =
"TIME" 51 STRATEGY_THREADS =
"THREADS" 52 STRATEGY_SDELAY =
"SDELAY" 53 STRATEGY_RDELAY =
"RDELAY" 54 STRATEGY_RETRY =
"RETRY" 55 STRATEGY_PRIORITY =
"PRIORITY" 56 STRATEGY_CPU_LOAD_MAX =
"CPU_LOAD_MAX" 57 STRATEGY_IO_WAIT_MAX =
"IO_WAIT_MAX" 58 STRATEGY_autoCleanupFields =
"autoCleanupFields" 60 STRATEGY_AUTOCLEANUP_TTL =
"TTL" 61 STRATEGY_AUTOCLEANUP_DELETE_TYPE =
"DeleteType" 62 STRATEGY_AUTOCLEANUP_DELETE_RETRIES =
"DeleteRetries" 63 STRATEGY_AUTOCLEANUP_SSTATE =
"State" 68 TIME_MAX_DEFAULT = 60000
117 self.
session[sessionVarName] = sessionVarValue
126 self.
strategy[strategyVarName] = strategyVarValue
134 self.
files.append(fileItem)
143 self.
limits[limitsVarName] = limitsVarValue
158 def __init__(self, taskCommandLine, taskId=None, name=None):
193 return json.dumps(self, default=
lambda o: o.__dict__, sort_keys=
True, indent=4)
268 return json.dumps(self, default=
lambda o: o.__dict__, sort_keys=
True, indent=4)
283 return json.dumps(self, default=
lambda o: o.__dict__, sort_keys=
True, indent=4)
292 LOG_STRATEGY =
"CHECK_LOG" 295 FILTER_CDATE_FROM =
"CDATE_FROM" 296 FILTER_CDATE_TO =
"CDATE_TO" 297 FILTER_SDATE_FROM =
"SDATE_FROM" 298 FILTER_SDATE_TO =
"SDATE_TO" 299 FILTER_RDATE_FROM =
"RDATE_FROM" 300 FILTER_RDATE_TO =
"RDATE_TO" 301 FILTER_FDATE_FROM =
"FDATE_FROM" 302 FILTER_FDATE_TO =
"FDATE_TO" 303 FILTER_INPROGRESS_TIME_FROM =
"INPROGRESS_TIME_FROM" 304 FILTER_INPROGRESS_TIME_TO =
"INPROGRESS_TIME_TO" 305 FILTER_INPROGRESS_TIME_MAX_FROM =
"INPROGRESS_TIME_MAX_FROM" 306 FILTER_INPROGRESS_TIME_MAX_TO =
"INPROGRESS_TIME_MAX_TO" 307 FILTER_INPROGRESS =
"INPROGRESS" 308 FILTER_SCHEDULED =
"SCHEDULED" 309 FILTER_RUNNING =
"RUNNING" 310 FILTER_FINISHED =
"FINISHED" 311 FILTER_TERMINATED =
"TERMINATED" 312 FILTER_CRASHED =
"CRASHED" 313 FILTER_RRAM_FROM =
"RRAM_FROM" 314 FILTER_RRAM_TO =
"RRAM_TO" 315 FILTER_VRAM_FROM =
"VRAM_FROM" 316 FILTER_VRAM_TO =
"VRAM_TO" 317 FILTER_CPU_FROM =
"CPU_FROM" 318 FILTER_CPU_TO =
"CPU_TO" 343 self.
filters[filterVarName] = filterVarValue
352 self.
strategy[strategyVarName] = strategyVarValue
378 return json.dumps(self, default=
lambda o: o.__dict__, sort_keys=
True, indent=4)
404 return json.dumps(self, default=
lambda o: o.__dict__, sort_keys=
True, indent=4)
433 RESPONSE_CODE_DBI_ERROR = 1
434 RESPONSE_CODE_UNKNOWN_ERROR = 2
435 RESPONSE_CODE_DRCE_ERROR = 3
437 TERMINATE_ALG_DEFAULT = 1
438 TERMINATE_ALG_CUSTOM = 2
440 TERMINATE_DELAY_DEFAULT = 1000
441 TERMINATE_REPEAT_DEFAULT = 3
443 TERMINATE_SIGTERM = 15
444 TERMINATE_SIGKILL = 9
446 ACTION_DELETE_TASK_DATA = 0
447 ACTION_TERMINATE_TASK_AND_DELETE_DATA = 1
448 ACTION_TERMINATE_TASK_ONLY = 2
449 ACTION_DELETE_ON_DTM = 3
451 DEFAULT_TASK_NAME =
"DELETE" 463 self.
id = ctypes.c_uint32(zlib.crc32(idGenerator.get_connection_uid(), int(time.time()))).value
505 return json.dumps(self, default=
lambda o: o.__dict__, sort_keys=
True, indent=4)
570 def __init__(self, errorCode=ERROR_OK, errorMessage=""):
582 return json.dumps(self, default=
lambda o: o.__dict__, sort_keys=
True, indent=4)
656 ERROR_CODE_TIMEOUT = -1
659 ERROR_CODE_TASK_NOT_FOUND = 3
660 ERROR_CODE_BAD_ID = 14
663 ERROR_MESSAGE_OK =
"" 664 ERROR_MESSAGE_TIMEOUT =
"Request timeout reached!" 665 ERROR_MESSAGE_TASK_NOT_FOUND =
"Task not found in queue!" 668 REQUEST_TYPE_CHECK = 1
669 REQUEST_TYPE_DELETE = 2
672 TASK_STATE_FINISHED = 0
673 TASK_STATE_IN_PROGRESS = 1
675 TASK_STATE_NOT_FOUND = 3
676 TASK_STATE_TERMINATED = 4
677 TASK_STATE_CRASHED = 5
678 TASK_STATE_SET_ERROR = 6
679 TASK_STATE_UNDEFINED = 7
680 TASK_STATE_TERMINATED_BY_DRCE_TTL = 11
681 TASK_STATE_SCHEDULED_TO_DELETE = 100
682 TASK_STATE_DELETED = 101
683 TASK_STATE_NEW_DATA_STORED = 102
684 TASK_STATE_NEW_SCHEDULED = 103
685 TASK_STATE_CLEANED = 104
686 TASK_STATE_NEW_JUST_CREATED = 105
687 TASK_STATE_SCHEDULE_TRIES_EXCEEDED = 106
688 TASK_STATE_RUN_TRIES_EXCEEDED = 107
745 return json.dumps(self, default=
lambda o: o.__dict__, sort_keys=
True, indent=4)
753 self.
files.append(fileItem)
770 def __init__(self, taskId, rTime, rTimeMax, state, priority):
802 if scheduledTasks
is not None:
803 self.
tasks = scheduledTasks
831 self.
state = ScheduledTask.STATE_PLANNED
961 FIELD_CLIENTS_LIST =
"clients" 975 if statFieldsDic
is not None:
976 self.
fields = statFieldsDic
980 return json.dumps(self, default=
lambda o: o.__dict__, sort_keys=
True, indent=4)
993 def __init__(self, className, configFieldsList=None):
1000 if configFieldsList
is not None:
1001 self.
fields = configFieldsList
1005 return json.dumps(self, default=
lambda o: o.__dict__, sort_keys=
True, indent=4)
1018 STATE_TRANSACTION_ROLLBACK = 6
1036 return json.dumps(self, default=
lambda o: o.__dict__, sort_keys=
True, indent=4)
1063 return json.dumps(self, default=
lambda o: o.__dict__, sort_keys=
True, indent=4)
1072 TASK_STATE_ERROR = 2000
1073 TASK_STATE_ERROR_MESSAGE =
"Wrong task state to cleanup task's data!" 1074 TASK_NOT_FOUND_ERROR = 2001
1075 TASK_NOT_FOUND_ERROR_MESSAGE =
"Task not found!" 1076 EMPRY_RAW_ERROR = 2002
1077 EMPRY_RAW_ERROR_MESSAGE =
"Empty json raw!" 1079 DRCE_ERROR_MESSAGE =
"Some DRCE Error!" 1096 TABLE_NAME_DEFAULT =
"task_back_log_scheme" 1103 def __init__(self, fetchNum, fetchAdditionalFields=False, criterions=None, tableName=TABLE_NAME_DEFAULT):
1104 super(FetchAvailabelTaskIds, self).
__init__()
1116 if app.SQLCriterions.CRITERION_WHERE
not in self.
criterions or \
1117 self.
criterions[app.SQLCriterions.CRITERION_WHERE]
is None:
1118 self.
criterions[app.SQLCriterions.CRITERION_WHERE] =
"deleteTaskId = 0" 1119 if app.SQLCriterions.CRITERION_ORDER
not in self.
criterions or \
1120 self.
criterions[app.SQLCriterions.CRITERION_ORDER]
is None:
1121 self.
criterions[app.SQLCriterions.CRITERION_ORDER] =
"rDate" 1122 if app.SQLCriterions.CRITERION_LIMIT
not in self.
criterions or \
1123 self.
criterions[app.SQLCriterions.CRITERION_LIMIT]
is None:
1135 super(AvailableTaskIds, self).
__init__()
1153 return json.dumps(self, default=
lambda o: o.__dict__, sort_keys=
True, indent=4)
UpdateTask event object, for update task field operation.
repeat
The repeat of task termination in the EE.
def __init__(self, taskId)
constructor initialize task's fields
filters
The filters criterion.
UpdateTaskFields event object, for update task fields operation.
errorMessage
The error message of request operation filled depends on.
session
The task session items init.
def __init__(self, taskId)
constructor initialize task's fields
def __init__(self, taskId)
constructor initialize task id field
errorCode
The EE response errorCode.
autoCleanupFields
The task autoCleanupFields init.
DeleteTaskResults event object, for delete task results from DTM application operation.
def __init__(self, taskId, checkType=TYPE_SIMPLE)
constructor initialize task's fields
def fillCriterions(self)
fillCriterions default initialize criterions method
alg
The algorithm of task termination in the EE.
def __init__(self, taskId)
constructor initialize task id field
strategy
The task strategy items init.
def __init__(self, taskId)
constructor initialize response data fields
stderr
The task process stderr.
def setStrategyVar(self, strategyVarName, strategyVarValue)
Set the strategy variable.
def __init__(self, taskId, rTime, rTimeMax, state, priority)
constructor initialize task data fields
def __init__(self, className, command)
constructor initialize task's fields
uDate
The information update date.
cpu
The node AVG CPU LA, %.
rTimeMax
The time to run max.
priority
The priority of the task in the schedule.
state
The state of a task in the schedule.
ExecuteTask event object, to set task to execute on EE.
def setLimitsVar(self, limitsVarName, limitsVarValue)
Set the limits variable.
NewTask event object, defines the Task object fields.
GeneralResponse event object, represents general state response for multipurpose usage.
command
the command code for state operation
AvailableTaskIds event object, for return all available task id.
nodePort
The task executor HCE node port.
def setStrategyVar(self, strategyVarName, strategyVarValue)
Set the strategy variable.
int TERMINATE_ALG_DEFAULT
def __init__(self, taskId)
constructor initialize task's field
UpdateScheduledTasks object used to represent task's related data update from the TasksManager to the...
rTime
The time to run, msec.
FetchTasksResultsFromCache event object, for fetch task's results data from DTM application.
def __init__(self, stype=0, data=None)
constructor initialize task's fields
files
The task files items attached init.
DeleteTaskData event object, to delete task's data in the storage.
FetchTasksResults event object, for fetch task's results data from EE.
files
The files list attached to the task.
port
The TCP port of the HCE node admin interface for the task process in the EE.
int ACTION_DELETE_TASK_DATA
DeleteTask event object, to delete task from DTM application and from EE.
strategy
The task strategy items attached.
def __init__(self, nodeId)
constructor initialize resource data fields
def __init__(self, className, statFieldsDic=None)
constructor initialize task's fields
def __init__(self, taskId)
constructor initialize task id field
int TERMINATE_DELAY_DEFAULT
AdminConfigVars event object, for admin set or get config variables from any of threaded classes or a...
int TERMINATE_REPEAT_DEFAULT
AvailableTaskIds event object, for return all available task id.
disk
The node AVG disk usage, %.
def __init__(self, deleteTaskId, taskId=None)
constructor initialize task's fields
def setFile(self, fileItem)
Set the file item.
input
The task cstdin stream buffer for EE process.
ResourcesAVG event object, represents summary of the EE resources utilization.
def __init__(self, taskId)
constructor initialize task's fields
state
The node host state.
Resource event object, represents resource's data fields .
exitStatus
The task process exit status.
swap
The node swap total, byte.
CheckTaskState event object, for check task status inside EE.
def setFile(self, fileItem)
Set the file item.
def __init__(self, scheduledTasks=None)
constructor initialize task data fields
pId
The task process Id in EE.
def __init__(self, taskId)
constructor initialize task's fields
ScheduledTask event object, represents task's data fields in the Schedule container.
taskIdMax
The task Id max.
AdminState event object, for admin manage change application state commands, like shutdown...
command
The task command line to execute inside EE.
rTimeMax
The time to run max.
def __init__(self, timeSlotSize)
constructor initialize task's field
def __init__(self)
constructor initialize criterion fields
ramVU
The node virtual RAM usage, byte.
taskTime
The task execution time.
GetTaskManagerFields event object, for get task fields values operation.
GetScheduledTasks event object, to get tasks per time slot range from the Scheduler.
FetchEEResponseData event object, to fetch EE response data from the storage.
TasksStatus event object, returns task status operation.
delay
The delay of task termination in the EE.
def __init__(self, taskCommandLine, taskId=None, name=None)
constructor initialize task's fields
def __init__(self, ids, tasks=None)
constructor initialize task's fields
disk
The node disk total, byte.
Task event object, defines the Task object fields.
nodeId
The node host name + port string in format "host:port".
nodeHost
The task executor HCE node host name.
errorMessage
The EE response error message.
GetScheduledTask event object, defines criterion to select tasks from the schedule.
DeleteEEResponseData event object, to delete EE response data from the storage.
state
The state of a task in the schedule.
def __init__(self, errorCode=ERROR_OK, errorMessage="")
constructor initialize response fields
def __init__(self, taskId, fetchType=TYPE_DELETE)
constructor initialize task's fields
ramRU
The node resource RAM usage, byte.
AdminStatData event object, for admin fetch stat fields and possible data from any threaded classes i...
ramR
The node AVG resource RAM usage, %.
def __init__(self, suspendType=SUSPEND)
constructor initialize task's fields
def setSessionVar(self, sessionVarName, sessionVarValue)
Set the OS session variable for EE process.
TaskManagerFields event object, for return task fields values.
FetchTaskData event object, to fetch task data from the storage.
def __init__(self, taskId)
constructor initialize task's fields
ramV
The node AVG virtual RAM usage, %.
requestTime
The EE request time.
ramV
The node virtial RAM total, byte.
AdminSuspend event object, for admin suspend command.
GetTasksStatus event object, for check task status operation.
diskU
The node disk usage, byte.
fields
The stat fields to fetch, if empty - all fields pairs returned.
def __init__(self, idsList)
constructor initialize task's fields
ids
The task Ids list self.ids = [].
host
The host name or IP address of task process HCE node in the EE.
rTimeMin
The time to run min.
ramR
The node resource RAM total, byte.
limits
The task limits init.
ramRU
The node AVG resource RAM usage, %.
def __init__(self)
constructor initialize task's fields
def __init__(self, fetchNum, fetchAdditionalFields=False, criterions=None, tableName=TABLE_NAME_DEFAULT)
constructor initialize task's fields
def __init__(self)
constructor initialize resource AVG fields
stdout
The task process stdout.
FetchAvailabelTaskIds event object, for fetch available task id.
IDGenerator is used to generate unique id for connections.
priorityMax
The priority max.
EEResponseData event object, store task results data, returned from EE.
statuses
The list of statuses in case of request used for group of objects or actions.
def getHash(strBuf, binSize=32, digestType=0, fixedMode=0, valLimit=18446744073709552000L)
uDate
The information update date oldest.
swap
The node AVG swap usage, %.
def __init__(self, taskId)
constructor initialize task's fields
def __init__(self, idsList)
constructor initialize task's field
def setFilterVar(self, filterVarName, filterVarValue)
Set the filter variable.
ramVU
The node AVG virtual RAM usage, %.
def __init__(self, taskId)
constructor initialize task id field
nodeName
The task executor HCE node name in EE.
signal
The signal it is a UNIX signal used to terminate task process in the EE.
swapU
The node swap usage, byte.
taskIdMin
The task Id min.
def __init__(self, className, configFieldsList=None)
constructor initialize task's fields
GetScheduledTasksResponse event object, to return list of task from the Scheduler.
errorCode
The errorCode of request operation, zero means success.