HCE Project PHP language client API bindings  1.5.1
Hierarchical Cluster Engine PHP Client Interface API
 All Classes Namespaces Files Functions Variables Pages
zmsg.php
Go to the documentation of this file.
1 <?php
2 /* =========================================================================
3  zmsg.php
4 
5  Multipart message class for example applications.
6 
7  Follows the ZFL class conventions and is further developed as the ZFL
8  zfl_msg class. See http://zfl.zeromq.org for more details.
9 
10  -------------------------------------------------------------------------
11  Copyright (c) 1991-2010 iMatix Corporation <www.imatix.com>
12  Copyright other contributors as noted in the AUTHORS file.
13 
14  This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
15 
16  This is free software; you can redistribute it and/or modify it under the
17  terms of the GNU Lesser General Public License as published by the Free
18  Software Foundation; either version 3 of the License, or (at your option)
19  any later version.
20 
21  This software is distributed in the hope that it will be useful, but
22  WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABIL-
23  ITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
24  Public License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28  =========================================================================
29 
30  @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
31 */
32 
33 class zmsg
34 {
40  private $_parts = array();
41 
47  private $_socket;
48 
54  public function __construct($socket = null)
55  {
56  $this->_socket = $socket;
57  }
58 
66  public function s_encode_uuid($data)
67  {
68  return "@" . bin2hex($data);
69  }
70 
77  public function s_decode_uuid($data)
78  {
79  return pack("H*", substr($data, 1));
80  }
81 
88  public function set_socket(ZMQSocket $socket)
89  {
90  $this->_socket = $socket;
91 
92  return $this;
93  }
94 
103  public function recv()
104  {
105  if (!isset($this->_socket)) {
106  throw new Exception("No socket supplied");
107  }
108  $this->_parts = array();
109  while (true) {
110  $this->_parts[] = $this->_socket->recv();
111  if (!$this->_socket->getSockOpt(ZMQ::SOCKOPT_RCVMORE)) {
112  break;
113  }
114  }
115 
116  return $this;
117  }
118 
126  public function send($clear = true)
127  {
128  if (!isset($this->_socket)) {
129  throw new Exception("No socket supplied");
130  }
131  $count = count($this->_parts);
132  $i = 1;
133  foreach ($this->_parts as $part) {
134  $mode = $i++ == $count ? null : ZMQ::MODE_SNDMORE;
135  $this->_socket->send($part, $mode);
136  }
137  if ($clear) {
138  unset($this->_parts);
139  $this->_parts = array();
140  }
141 
142  return $this;
143  }
144 
150  public function parts()
151  {
152  return count($this->_parts);
153  }
154 
160  public function last()
161  {
162  return $this->_parts[count($this->_parts)-1];
163  }
164 
170  public function set_last($set)
171  {
172  $this->_parts[count($this->_parts)-1] = $set;
173  }
174 
180  public function body()
181  {
182  return $this->_parts[count($this->_parts) -1];
183  }
184 
191  public function body_set($body)
192  {
193  $pos = count($this->_parts);
194  if ($pos > 0) {
195  $pos = $pos - 1;
196  }
197  $this->_parts[$pos] = $body;
198 
199  return $this;
200  }
201 
207  public function body_fmt()
208  {
209  $args = func_get_args();
210  $this->body_set(vsprintf(array_shift($args), $args));
211 
212  return $this;
213  }
214 
221  public function push($part)
222  {
223  array_unshift($this->_parts, $part);
224  }
225 
232  public function pop()
233  {
234  return array_shift($this->_parts);
235  }
236 
243  public function address()
244  {
245  $address = count($this->_parts) ? $this->_parts[0] : null;
246 
247  return (strlen($address) == 17 && $address[0] == 0) ? $this->s_encode_uuid($address) : $address;
248  }
249 
258  public function wrap($address, $delim = null)
259  {
260  if ($delim !== null) {
261  $this->push($delim);
262  }
263  if ($address[0] == '@' && strlen($address) == 33) {
264  $address = $this->s_decode_uuid($address);
265  }
266  $this->push($address);
267 
268  return $this;
269  }
270 
278  public function unwrap()
279  {
280  $address = $this->pop();
281  if (!$this->address()) {
282  $this->pop();
283  }
284 
285  return $address;
286  }
287 
293  public function __toString()
294  {
295  $string = "--------------------------------------" . PHP_EOL;
296  foreach ($this->_parts as $index => $part) {
297  $len = strlen($part);
298  if ($len == 17 && $part[0] == 0) {
299  $part = $this->s_encode_uuid($part);
300  $len = strlen($part);
301  }
302  $string .= sprintf ("[%03d] %s %s", $len, $part, PHP_EOL);
303  }
304 
305  return $string;
306  }
307 
314  public static function test()
315  {
316  $result = true;
317  $context = new ZMQContext();
318  $output = new ZMQSocket($context, ZMQ::SOCKET_DEALER);
319  $output->bind("inproc://zmsg_selftest");
320 
321  $input = new ZMQSocket($context, ZMQ::SOCKET_ROUTER);
322  $input->connect("inproc://zmsg_selftest");
323 
324  // Test send and receive of single-part message
325  $zmsgo = new Zmsg($output);
326  $zmsgo->body_set("Hello");
327  $result &= assert($zmsgo->body() == "Hello");
328  $zmsgo->send();
329 
330  $zmsgi = new Zmsg($input);
331  $zmsgi->recv();
332  $result &= assert($zmsgi->parts() == 2);
333  $result &= assert($zmsgi->body() == "Hello");
334  echo $zmsgi;
335 
336  // Test send and receive of multi-part message
337  $zmsgo = new Zmsg($output);
338  $zmsgo->body_set("Hello");
339  $zmsgo->wrap("address1", "");
340  $zmsgo->wrap("address2");
341  $result &= assert($zmsgo->parts() == 4);
342  echo $zmsgo;
343  $zmsgo->send();
344 
345  $zmsgi = new Zmsg($input);
346  $zmsgi->recv();
347  $result &= assert($zmsgi->parts() == 5);
348  $zmsgi->unwrap();
349  $result &= assert($zmsgi->unwrap() == "address2");
350 
351  $zmsgi->body_fmt("%s%s", 'W', "orld");
352  $result &= assert($zmsgi->body() == "World");
353 
354  // Pull off address 1, check that empty part was dropped
355  $zmsgi->unwrap();
356  $result &= assert($zmsgi->parts() == 1);
357 
358  // Check that message body was correctly modified
359  $part = $zmsgi->pop();
360  $result &= assert ($part == "World");
361  $result &= assert($zmsgi->parts() == 0);
362 
363  // Test load and save
364  $zmsg = new Zmsg();
365  $zmsg->body_set("Hello");
366  $zmsg->wrap("address1", "");
367  $zmsg->wrap("address2");
368  $result &= assert($zmsg->parts() == 4);
369  $fh = fopen(sys_get_temp_dir() . "/zmsgtest.zmsg", 'w');
370  $zmsg->save($fh);
371  fclose($fh);
372  $fh = fopen(sys_get_temp_dir() . "/zmsgtest.zmsg", 'r');
373  $zmsg2 = new Zmsg();
374  $zmsg2->load($fh);
375  assert($zmsg2->last() == $zmsg->last());
376  fclose($fh);
377  $result &= assert($zmsg2->parts() == 4);
378  echo ($result ? "OK" : "FAIL"), PHP_EOL;
379 
380  return $result;
381  }
382 
389  public function save($fh)
390  {
391  foreach ($this->_parts as $part) {
392  fwrite($fh, chr(strlen($part)));
393  if (strlen($part) > 0) {
394  fwrite($fh, $part);
395  }
396  }
397 
398  return $this;
399  }
400 
407  public function load($fh)
408  {
409  while (!feof($fh) && $size = fread($fh, 1)) {
410  $this->_parts[] = ord($size) > 0 ? fread($fh, ord($size)) : '';
411  }
412 
413  return $this;
414  }
415 }