HCE Project PHP language client API bindings OOP style  1.1.1
Hierarchical Cluster Engine PHP Client Interface API OOP style
 All Classes Namespaces Files Functions Variables Pages
Request.inc.php
Go to the documentation of this file.
1 <?php
2 
14 namespace HCE\transport {
15 
16  require_once 'zmsg.php';
17  require_once 'Constants.inc.php';
18  require_once 'Response.inc.php';
19  require_once 'Connection.inc.php';
20 
24  class Request {
28  protected $connection = NULL;
29 
33  protected $messageId = NULL;
34 
38  protected $messageBody = NULL;
39 
43  protected $timeout = PROTOCOL_MSG_RESPONSE_TIMEOUT_DEFAULT;
44 
48  protected $error = 0;
49 
57  public function setConnection($connection = null) {
58  if ($connection == NULL) {
59  $this->connection = new \HCE\transport\Connection ();
60  } else {
61  $this->connection = $connection;
62  }
63  }
64 
70  public function getConnection() {
71  return $this->connection;
72  }
73 
82  public function setMessageId($messageId) {
83  if ($messageId == '') {
84  return false;
85  } else {
86  $this->messageId = $messageId;
87  return true;
88  }
89  }
90 
96  public function getMessageId() {
97  return $this->messageId;
98  }
99 
107  public function setMessageBody($messageBody) {
108  $this->messageBody = $messageBody;
109  }
110 
116  public function getMessageBody() {
117  return $this->messageBody;
118  }
119 
127  public function setErrorCode($errorCode) {
128  $this->error = $errorCode;
129  }
130 
136  public function getErrorCode() {
137  return $this->error;
138  }
139 
147  public function __construct($connection = null) {
148  // Set connection
149  $this->setConnection ( $connection );
150 
151  // Set message Id
152  $this->setMessageId ( \HCE\transport\IdGenerator::getMessageId () );
153 
154  // Set message Body
155  $this->setMessageBody ( '' );
156  }
157 
161  public function __destruct() {
162  $this->connection = NULL;
163  }
164 
176  public function execute($msgBody = NULL, $timeout = null) {
177  // Reset error state
178  $this->setErrorCode ( 0 );
179 
180  // New response object
181  $response = new \HCE\transport\Response ();
182 
183  $message = array (
184  'id' => $this->getMessageId (),
185  'body' => ($msgBody === NULL ? $this->getMessageBody () : $msgBody)
186  );
187  // Send message
188  $ret = $this->messageSend ( $message );
189 
190  // Check state
191  if ($ret ['error'] == PROTOCOL_ERROR_OK) {
192  // Receive response
193  $responseItem = $this->messageReceive ( $timeout === null ? $this->timeout : $timeout );
194  // Check state
195  if ($responseItem ['error'] == PROTOCOL_ERROR_OK) {
196  if ($responseItem ['messages'] !== NULL) {
197  // Check response items corespondence for requested message and filter items set
198  $responseItems = array ();
199  $wrongItems = 0;
200  foreach ( $responseItem ['messages'] as $responseItem ) {
201  if ($responseItem ['id'] == $message ['id']) {
202  $responseItems [] = $responseItem;
203  } else {
204  $wrongItems ++;
205  }
206  }
207  if ($wrongItems > 0) {
208  $response->setErrorCode ( PROTOCOL_ERROR_WRONG_RESPONSE_MSG_ID );
209  }
210  $response->setResponses ( $responseItems );
211  }
212  } else {
213  // Set own error code
214  $this->setErrorCode ( $responseItem ['error'] );
215  // Set error code for response object
216  $response->setErrorCode ( $responseItem ['error'] );
217  }
218  } else {
219  // Set error code
220  $this->setErrorCode ( $ret ['error'] );
221  // Set error code for response object
222  $response->setErrorCode ( $ret ['error'] );
223  }
224 
225  return $response;
226  }
227 
236  public function messageSend($fields_array) {
237  // Init return HCE message array
238  $ret = array (
239  'error' => PROTOCOL_ERROR_OK,
240  'message' => null
241  );
242 
243  if (! isset ( $fields_array ['body'] ) || ! isset ( $fields_array ['id'] )) {
244  $ret ['error'] = PROTOCOL_ERROR_FIELDS_NOT_SET;
245  } else {
246  // Create message
247  $ret ['message'] = new \Zmsg ( $this->connection->socket );
248 
249  // Set fields
250  $ret ['message']->wrap ( $fields_array ['body'], NULL );
251  $ret ['message']->wrap ( $fields_array ['id'], NULL );
252 
253  // Send message
254  $ret ['message']->send ();
255  }
256 
257  return $ret;
258  }
259 
268  protected function messageReceive($timeout = PROTOCOL_MSG_RESPONSE_TIMEOUT_DEFAULT) {
269  // Init return HCE response item array
270  $ret = array (
271  'error' => PROTOCOL_ERROR_OK,
272  'messages' => array ()
273  );
274 
275  // Poll socket for a reply, with timeout
276  $read = $write = array ();
277  $poll = new \ZMQPoll ();
278  $poll->add ( $this->connection->socket, \ZMQ::POLL_IN );
279  $events = $poll->poll ( $read, $write, $timeout );
280  // Handle events
281  if ($events) {
282  foreach ( $read as $socket ) {
283  $zmsg_r = new \Zmsg ( $socket );
284  $zmsg_r->recv ();
285  $ret ['messages'] [] = array (
286  'error' => PROTOCOL_ERROR_OK,
287  'message' => $zmsg_r,
288  'id' => $zmsg_r->unwrap (),
289  'body' => $zmsg_r->unwrap ()
290  );
291  // var_dump ( $ret );
292  }
293  } else {
294  $ret ['error'] = PROTOCOL_ERROR_TIMEOUT;
295  }
296  $poll->remove ( $this->connection->socket );
297 
298  return $ret;
299  }
300  }
301 }
302 
303 ?>