HCE Project DC service web UI  0.2
Hierarchical Cluster Engine DC service web UI
 All Classes Namespaces Files Functions Variables Pages
JobManager.php
Go to the documentation of this file.
1 <?php
2 
3 class JobManager extends CApplicationComponent
4 {
10  public $jobs = array();
11 
17  public $clearComponents = array();
18 
24  public $executeAsync = true;
25 
31  protected static $logClass = "application.modules.job.components.JobManager";
32 
40  public function addJob(Job $job, $checkEnqueuedDuplicate = true)
41  {
42  if ($checkEnqueuedDuplicate) {
43  $duplicate = $this->findEnqueuedDuplicateJob($job);
44  if ($duplicate) {
45  return false;
46  }
47  }
48 
49  return $this->saveJob($job);
50  }
51 
58  public function execJob(Job $job, $id = false)
59  {
60  Yii::trace("Exec job directly, class = ".get_class($job));
61 
62  if (!empty($id)) {
63  $job->id = $id;
64  }
65 
66  $ret = $job->executeDirect();
67  if ($ret != JobStatus::SUCCESS) {
68  throw new Exception("Job execution was not successful: ".print_r($job->getFinishMessage(), true));
69  }
70  }
71 
80  {
81  if (!isset($attributes['class'])) {
82  throw new Exception('Job needs to define a class');
83  }
84 
85  $class = $attributes['class'];
86  $jobData = array();
87  unset($attributes['class']);
88 
89  if (isset($attributes['job_data'])) {
90  $jobData = $attributes['job_data'];
91  unset($attributes['job_data']);
92  }
93 
94  $model = new $class();
95  $model->job_class = $class;
96  $model->setAttributes($attributes);
97  $model->setJobData($jobData);
98 
99  return $model;
100  }
101 
110  {
111  if (!isset($attributes['class'])) {
112  throw new Exception('Job needs to define a class');
113  }
114 
115  if (!isset($attributes['crontab'])) {
116  throw new Exception('Job needs to define a crontab value');
117  }
118 
119  $model = $this->createJobFromArray($attributes);
120 
121  $models = Job::model()->findAll('t.job_class=:job_class AND t.crontab IS NOT NULL', array(':job_class' => $attributes['class']));
122 
123  foreach ($models as $compareModel) {
124  if ($model->isDuplicateOf($compareModel)) {
125  $model = $compareModel;
126  break;
127  }
128  }
129 
130  if ($model->job_status_id == JobStatus::RUNNING) {
131  //do not sync a job which is currently running, we do not want to break it
132  return $model;
133  }
134 
135  if ($attributes['crontab'] != $model->crontab) {
136  Yii::trace("Setting new time");
137  $model->calculateNextPlannedTime();
138  }
139 
140  $this->addJob($model, true);
141 
142  return $model;
143  }
144 
148  public function syncJobs()
149  {
150  $ids = array();
151  foreach ($this->jobs as $attributes) {
152  if ($this->__checkLimitsProcesses($attributes)) {
153  $model = $this->addCronTabJobFromArray($attributes);
154  $ids[] = $model->id;
155  }
156  }
157  }
158 
162  public function runJobs()
163  {
164  $tx = Yii::app()->db->beginTransaction();
165  $job = null;
166 
167  try {
168  $now = $this->timestampToDatabaseDate();
169  $jobStatus = JobStatus::ENQUEUED;
170 
171  $sql = "SELECT * FROM job WHERE planned_time <= '{$now}' AND job_status_id = {$jobStatus}";
172 
173  $sql .= " LIMIT 1 FOR UPDATE";
174 
175  $job = Job::model()->findBySql($sql);
176 
177  if ($job) {
178  $this->onBeforeExecute(new CEvent($this, array("job" => $job)));
179  $job->beforeExecute();
180  }
181 
182  $tx->commit();
183  } catch (Exception $ex) {
184  $tx->rollback();
185  Yii::log("Error in finding job: ".$ex->getMessage());
186  }
187 
188  if ($job) {
189  if ($this->executeAsync) {
190  $job->executeAsync();
191  } else {
192  $job->execute();
193  }
194 
195  $this->onAfterExecute(new CEvent($this, array("job" => $job)));
196 
197  $this->runJobs();
198  }
199  }
200 
204  public function onBeforeExecute($event)
205  {
206  $this->raiseEvent('onBeforeExecute', $event);
207  }
208 
212  public function onAfterExecute($event)
213  {
214  $this->raiseEvent('onAfterExecute', $event);
215  }
216 
224  protected function saveJob(Job $job)
225  {
226  if (!$job->save()) {
227  Yii::log('Saving a job failed: '.print_r($job->errors, true), 'error');
228 
229  return false;
230  }
231 
232  return true;
233  }
234 
242  protected function timestampToDatabaseDate($timestamp = null)
243  {
244  if ($timestamp === null) {
245  $timestamp = time();
246  }
247 
248  return date("Y-m-d G:i:s", $timestamp);
249  }
250 
258  protected function findEnqueuedDuplicateJob(Job $job)
259  {
260  $criteria = new CDbCriteria(array(
261  'condition' => 'job_class=:job_class AND job_status_id=:job_status_id',
262  'params' => array(':job_class' => $job->job_class, ':job_status_id' => JobStatus::ENQUEUED),
263  ));
264  $jobs = Job::model()->findAll($criteria);
265  if (!empty($jobs)) {
266  return $jobs;
267  }
268 
269  return;
270  }
271 
275  private function clearComponents()
276  {
277  foreach ($this->clearComponents as $id) {
278  Yii::trace("Clear component: $id");
279  Yii::app()->setComponent($id, null);
280  }
281  }
282 
292  {
293  $filterClass = $class;
294  if (isset($attributes['job_class'])) {
295  $filterClass = $attributes['job_class'];
296  }
297 
298  if (isset($attributes['filterClass'])) {
299  $filterClass = $attributes['filterClass'];
300  $attributes['job_class'] = $filterClass;
301  unset($attributes['filterClass']);
302  }
303 
304  if (is_array($filterClass)) {
305  $filterClass = $filterClass[0];
306  }
307 
308  $model = $class::model();
309 
310  /* @var $filterModel Job */
311  $filterModel = $filterClass::model();
312 
313  $identifierAttributes = array();
314  foreach ($attributes as $name => $value) {
315  if (!$model->hasAttribute($name)) {
316  $identifiers = array_flip($filterModel->identifiers());
317  if (isset($identifiers[$name])) {
318  unset($attributes[$name]);
319  $column = $identifiers[$name];
320  $identifierAttributes[$column] = $value;
321  } else {
322  throw new Exception("Attribute {$name} not defined on ($filterClass} or mapped as identifier, defined identifiers: ".print_r($identifiers, true));
323  }
324  }
325  }
326 
327  return array_merge($attributes, $identifierAttributes);
328  }
329 
339  private function findAll($class, $attributes, $condition = null)
340  {
342  $model = $class::model();
343 
344  return $model->findAllByAttributes($attributes, $condition);
345  }
346 
355  {
356  if (!isset($attributes['class'])) {
357  throw new Exception('Job needs to define a class');
358  }
359 
360  $attributes['limit'] = !empty($attributes['limit']) ? $attributes['limit'] : false;
361  exec('ps ax | grep "name="'.escapeshellarg($attributes['class']), $processes);
362  $cntProcesses = count($processes) - 1;
363 
364  if ($cntProcesses > 0 &&
365  (!empty($attributes['limit']) &&
366  $cntProcesses >= $attributes['limit'])
367  ) {
368  Yii::trace('Limit of executed processes: '.$attributes['limit']);
369 
370  return false;
371  }
372 
373  return true;
374  }
375 }