14 namespace HCE\transport {
16 require_once
'zmsg.php';
17 require_once
'Constants.inc.php';
18 require_once
'Response.inc.php';
19 require_once
'Connection.inc.php';
33 protected $messageId = NULL;
43 protected $timeout = PROTOCOL_MSG_RESPONSE_TIMEOUT_DEFAULT;
59 $this->connection = new \HCE\transport\Connection ();
83 if ($messageId ==
'') {
86 $this->messageId = $messageId;
97 return $this->messageId;
128 $this->error = $errorCode;
155 $this->setMessageBody (
'' );
162 $this->connection = NULL;
176 public function execute($msgBody = NULL, $timeout = null) {
178 $this->setErrorCode ( 0 );
181 $response = new \HCE\transport\Response ();
184 'id' => $this->getMessageId (),
185 'body' => ($msgBody === NULL ? $this->getMessageBody () : $msgBody)
188 $ret = $this->messageSend ( $message );
191 if ($ret [
'error'] == PROTOCOL_ERROR_OK) {
193 $responseItem = $this->messageReceive ( $timeout === null ? $this->timeout : $timeout );
195 if ($responseItem [
'error'] == PROTOCOL_ERROR_OK) {
196 if ($responseItem [
'messages'] !== NULL) {
198 $responseItems = array ();
200 foreach ( $responseItem [
'messages'] as $responseItem ) {
201 if ($responseItem [
'id'] == $message [
'id']) {
202 $responseItems [] = $responseItem;
207 if ($wrongItems > 0) {
208 $response->setErrorCode ( PROTOCOL_ERROR_WRONG_RESPONSE_MSG_ID );
210 $response->setResponses ( $responseItems );
214 $this->setErrorCode ( $responseItem [
'error'] );
216 $response->setErrorCode ( $responseItem [
'error'] );
220 $this->setErrorCode ( $ret [
'error'] );
222 $response->setErrorCode ( $ret [
'error'] );
239 'error' => PROTOCOL_ERROR_OK,
243 if (! isset ( $fields_array [
'body'] ) || ! isset ( $fields_array [
'id'] )) {
244 $ret [
'error'] = PROTOCOL_ERROR_FIELDS_NOT_SET;
247 $ret [
'message'] = new \Zmsg ( $this->connection->socket );
250 $ret [
'message']->wrap ( $fields_array [
'body'], NULL );
251 $ret [
'message']->wrap ( $fields_array [
'id'], NULL );
254 $ret [
'message']->send ();
268 protected function messageReceive($timeout = PROTOCOL_MSG_RESPONSE_TIMEOUT_DEFAULT) {
271 'error' => PROTOCOL_ERROR_OK,
272 'messages' => array ()
276 $read = $write = array ();
277 $poll = new \ZMQPoll ();
278 $poll->add ( $this->connection->socket, \ZMQ::POLL_IN );
279 $events = $poll->poll ( $read, $write, $timeout );
282 foreach ( $read as $socket ) {
283 $zmsg_r = new \Zmsg ( $socket );
285 $ret [
'messages'] [] = array (
286 'error' => PROTOCOL_ERROR_OK,
287 'message' => $zmsg_r,
288 'id' => $zmsg_r->unwrap (),
289 'body' => $zmsg_r->unwrap ()
294 $ret [
'error'] = PROTOCOL_ERROR_TIMEOUT;
296 $poll->remove ( $this->connection->socket );