2 HCE project, Python bindings, Distributed Tasks Manager application. 3 Event objects definitions. 7 @author Oleksii <developers.hce@gmail.com> 8 @link: http://hierarchical-cluster-engine.com/ 9 @copyright: Copyright © 2013-2014 IOIX Ukraine 10 @license: http://hierarchical-cluster-engine.com/license/ 15 from threading
import Lock
17 from sqlalchemy.sql
import select
18 from sqlalchemy
import create_engine
19 from sqlalchemy.orm
import sessionmaker
20 from sqlalchemy.engine
import reflection
22 from sqlalchemy
import text
24 import Constants
as CONSTANTS
39 MANDATORY_DBS = [
"ee_responses_table",
"resources_table",
"scheduler_task_scheme",
"task_back_log_scheme",
40 "task_log_scheme",
"tasks_data_table"]
49 engineString =
"mysql+mysqldb://%s:%s@%s" 50 engineString = engineString % (config_dic[
"db_user"], config_dic[
"db_pass"], config_dic[
"db_host"])
51 if "db_port" in config_dic:
52 engineString +=
':' + str(config_dic[
"db_port"])
53 engineString +=
'/' + config_dic[
"db_name"]
56 self.
engine = create_engine(engineString)
58 Session = sessionmaker(bind=self.
engine)
70 insp = reflection.Inspector.from_engine(self.
engine)
71 avlTables = insp.get_table_names()
73 if dbTable
not in avlTables:
74 raise Exception(
">>> %s table not present in database" % dbTable)
75 except Exception
as err:
76 self.
errorCode = CONSTANTS.DBI_COMMON_ERROR_CODE
77 self.
errorMessage = CONSTANTS.DBI_COMMON_ERROR_MSG +
": " + err.message
108 self.
errorCode = CONSTANTS.DBI_SUCCESS_CODE
111 if isinstance(tasks, collections.Iterable):
114 taskList.append(tasks)
119 for task
in taskList:
123 except sqlalchemy.exc.SQLAlchemyError, err:
124 logger.info(
"tasks: %s, type: %s", str(tasks), str(
type(tasks)))
125 self.
errorMessage = CONSTANTS.DBI_INSERT_ERROR_MSG +
": " + str(err)
128 except Exception, err:
129 logger.info(
"tasks: %s, type: %s", str(tasks), str(
type(tasks)))
130 self.
errorMessage = CONSTANTS.DBI_INSERT_ERROR_MSG +
": " + str(err)
134 self.
errorCode = CONSTANTS.DBI_INSERT_ERROR_CODE
150 self.
errorCode = CONSTANTS.DBI_SUCCESS_CODE
158 except sqlalchemy.exc.SQLAlchemyError
as err:
160 self.
errorCode = CONSTANTS.DBI_FETCH_ERROR_CODE
161 self.
errorMessage = CONSTANTS.DBI_FETCH_ERROR_MSG +
": " + str(err)
163 except Exception
as err:
164 self.
errorCode = CONSTANTS.DBI_FETCH_ERROR_CODE
165 self.
errorMessage = CONSTANTS.DBI_FETCH_ERROR_MSG +
": " + str(err)
179 def sql(self, obj, clause):
181 self.
errorCode = CONSTANTS.DBI_SUCCESS_CODE
189 except sqlalchemy.exc.SQLAlchemyError
as err:
191 self.
errorCode = CONSTANTS.DBI_SQL_ERROR_CODE
192 self.
errorMessage = CONSTANTS.DBI_SQL_ERROR_MSG +
": " + str(err) +
", clause:\n" + str(clause)
194 except Exception
as err:
195 self.
errorCode = CONSTANTS.DBI_SQL_ERROR_CODE
196 self.
errorMessage = CONSTANTS.DBI_SQL_ERROR_MSG +
": " + str(err) +
", clause:\n" + str(clause)
246 self.
errorCode = CONSTANTS.DBI_SUCCESS_CODE
250 connection = self.
engine.connect()
253 if isinstance(tasks, collections.Iterable):
256 taskList.append(tasks)
260 for task
in taskList:
262 fetched.append(connection.execute(select([
type(task)])).fetchall())
263 except sqlalchemy.exc.SQLAlchemyError, err:
265 self.
errorMessage = CONSTANTS.DBI_FETCH_ERROR_MSG +
": " + str(err)
267 except Exception, err:
268 self.
errorMessage = CONSTANTS.DBI_FETCH_ERROR_MSG +
": " + str(err)
272 self.
errorCode = CONSTANTS.DBI_FETCH_ERROR_CODE
278 raise DBIErr(CONSTANTS.DBI_FETCH_ERROR_CODE, str(e))
280 if connection
is not None:
296 self.
errorCode = CONSTANTS.DBI_SUCCESS_CODE
301 attributes = [attr
for attr
in dir(obj)
if not attr.startswith(
'__') \
302 and not attr.startswith(
'_')
and getattr(obj, attr)
is not None]
304 for attr
in attributes:
305 setattr(updated_task, attr, getattr(obj, attr))
306 updated.append(updated_task)
310 except sqlalchemy.exc.SQLAlchemyError
as err:
312 self.
errorCode = CONSTANTS.DBI_UPDATE_ERROR_CODE
313 self.
errorMessage = CONSTANTS.DBI_UPDATE_ERROR_MSG +
": " + str(err)
315 except Exception
as err:
316 self.
errorCode = CONSTANTS.DBI_UPDATE_ERROR_CODE
317 self.
errorMessage = CONSTANTS.DBI_UPDATE_ERROR_MSG +
": " + str(err)
332 self.
errorCode = CONSTANTS.DBI_SUCCESS_CODE
336 if isinstance(tasks, collections.Iterable):
339 taskList.append(tasks)
341 for task
in taskList:
347 except sqlalchemy.exc.SQLAlchemyError
as err:
349 self.
errorCode = CONSTANTS.DBI_INSERT_ERROR_CODE
350 self.
errorMessage = CONSTANTS.DBI_INSERT_ERROR_MSG +
": " + str(err)
352 except Exception
as err:
353 self.
errorCode = CONSTANTS.DBI_INSERT_ERROR_CODE
354 self.
errorMessage = CONSTANTS.DBI_INSERT_ERROR_MSG +
": " + str(err)
369 self.
errorCode = CONSTANTS.DBI_SUCCESS_CODE
374 deleted.append(deleted_task)
379 except sqlalchemy.exc.SQLAlchemyError
as err:
381 self.
errorCode = CONSTANTS.DBI_DELETE_ERROR_CODE
382 self.
errorMessage = CONSTANTS.DBI_DELETE_ERROR_MSG +
": " + str(err)
384 except Exception
as err:
385 self.
errorCode = CONSTANTS.DBI_DELETE_ERROR_CODE
386 self.
errorMessage = CONSTANTS.DBI_DELETE_ERROR_MSG +
": " + str(err)
438 self.
errorCode = CONSTANTS.DBI_SUCCESS_CODE
441 connection = self.
engine.connect()
444 if isinstance(tasks, collections.Iterable):
447 taskList.append(tasks)
451 for task
in taskList:
454 except sqlalchemy.exc.SQLAlchemyError, err:
456 self.
errorMessage = CONSTANTS.DBI_DELETE_ERROR_MSG +
": " + str(err)
458 except Exception, err:
459 self.
errorMessage = CONSTANTS.DBI_DELETE_ERROR_MSG +
": " + str(err)
463 self.
errorCode = CONSTANTS.DBI_DELETE_ERROR_CODE
469 raise DBIErr(CONSTANTS.DBI_DELETE_ERROR_CODE, str(e))
471 if connection
is not None:
484 self.
errorCode = CONSTANTS.DBI_SUCCESS_CODE
487 sqlText =
text(clause)
489 if result
is not None:
490 customResponse = result.fetchall()
494 except sqlalchemy.exc.SQLAlchemyError
as err:
496 self.
errorCode = CONSTANTS.DBI_SQL_ERROR_CODE
497 self.
errorMessage = CONSTANTS.DBI_SQL_ERROR_MSG +
": " + err.message
499 except Exception
as err:
500 self.
errorCode = CONSTANTS.DBI_SQL_ERROR_CODE
501 self.
errorMessage = CONSTANTS.DBI_SQL_ERROR_MSG +
": " + err.message
504 return customResponse
518 insp = reflection.Inspector.from_engine(engine)
519 localColumns = insp.get_columns(obj.__tablename__)
520 for columName
in [elem[
"name"]
for elem
in localColumns]:
521 if columName
in obj.__dict__:
522 criterions[columName] = obj.__dict__[columName]
523 if len(criterions) > 0:
524 ret = session.query(
type(obj)).filter_by(**criterions)
534 Exception.__init__(self, message)
def __init__(self, errCode, message)
def insertOnUpdate(self, tasks, clause)
def fetch(self, obj, clause)
def execute(self, commands, nodes)
execute method execute incoming commands on nodes, keepts reult in responses and responsesDicts field...
def deleteAll(self, tasks)
def makeQueryFromObj(obj, engine, session)
def update(self, obj, clause)
def delete(self, obj, clause)
def sqlCustom(self, clause)
def sql(self, obj, clause)
def fetchAll(self, tasks)
def __init__(self, config_dic)