HCE Project PHP language client API bindings OOP style  1.1.1
Hierarchical Cluster Engine PHP Client Interface API OOP style
 All Classes Namespaces Files Functions Variables Pages
zmsg.php
Go to the documentation of this file.
1 <?php
2 /* =========================================================================
3  zmsg.php
4 
5  Multipart message class for example applications.
6 
7  Follows the ZFL class conventions and is further developed as the ZFL
8  zfl_msg class. See http://zfl.zeromq.org for more details.
9 
10  -------------------------------------------------------------------------
11  Copyright (c) 1991-2010 iMatix Corporation <www.imatix.com>
12  Copyright other contributors as noted in the AUTHORS file.
13 
14  This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
15 
16  This is free software; you can redistribute it and/or modify it under the
17  terms of the GNU Lesser General Public License as published by the Free
18  Software Foundation; either version 3 of the License, or (at your option)
19  any later version.
20 
21  This software is distributed in the hope that it will be useful, but
22  WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABIL-
23  ITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
24  Public License for more details.
25 
26  You should have received a copy of the GNU Lesser General Public License
27  along with this program. If not, see <http://www.gnu.org/licenses/>.
28  =========================================================================
29 
30  @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
31 */
32 
33 class zmsg
34 {
40  private $_parts = array();
41 
47  private $_socket;
48 
54  public function __construct($socket = null)
55  {
56  $this->_socket = $socket;
57  }
58 
66  public function s_encode_uuid($data)
67  {
68  return "@" . bin2hex($data);
69  }
70 
77  public function s_decode_uuid($data)
78  {
79  return pack("H*", substr($data, 1));
80  }
81 
88  public function set_socket(ZMQSocket $socket)
89  {
90  $this->_socket = $socket;
91 
92  return $this;
93  }
94 
103  public function recv()
104  {
105  if (!isset($this->_socket)) {
106  throw new Exception("No socket supplied");
107  }
108  $this->_parts = array();
109  while (true) {
110  $this->_parts[] = $this->_socket->recv();
111  if (!$this->_socket->getSockOpt(ZMQ::SOCKOPT_RCVMORE)) {
112  break;
113  }
114  }
115 
116  return $this;
117  }
118 
126  public function send($clear = true)
127  {
128  if (!isset($this->_socket)) {
129  throw new Exception("No socket supplied");
130  }
131  $count = count($this->_parts);
132  $i = 1;
133  foreach ($this->_parts as $part) {
134  $mode = $i++ == $count ? null : ZMQ::MODE_SNDMORE;
135  $this->_socket->send($part, $mode);
136  }
137  if ($clear) {
138  unset($this->_parts);
139  $this->_parts = array();
140  }
141 
142  return $this;
143  }
144 
150  public function parts()
151  {
152  return count($this->_parts);
153  }
154 
160  public function last()
161  {
162  return $this->_parts[count($this->_parts)-1];
163  }
164 
170  public function set_last($set)
171  {
172  $this->_parts[count($this->_parts)-1] = $set;
173  }
174 
180  public function body()
181  {
182  return $this->_parts[count($this->_parts) -1];
183  }
184 
191  public function body_set($body)
192  {
193  $pos = count($this->_parts);
194  if ($pos > 0) {
195  $pos = $pos - 1;
196  }
197  $this->_parts[$pos] = $body;
198 
199  return $this;
200  }
201 
207  public function body_fmt()
208  {
209  $args = func_get_args();
210  $this->body_set(vsprintf(array_shift($args), $args));
211 
212  return $this;
213  }
214 
221  public function push($part)
222  {
223  array_unshift($this->_parts, $part);
224  }
225 
232  public function pop()
233  {
234  return array_shift($this->_parts);
235  }
236 
243  public function address()
244  {
245  $address = count($this->_parts) ? $this->_parts[0] : null;
246 
247  return (strlen($address) == 17 && $address[0] == 0) ? $this->s_encode_uuid($address) : $address;
248  }
249 
258  public function wrap($address, $delim = null)
259  {
260  if ($delim !== null) {
261  $this->push($delim);
262  }
263  if ($address[0] == '@' && strlen($address) == 33) {
264  $address = $this->s_decode_uuid($address);
265  }
266  $this->push($address);
267 
268  return $this;
269  }
270 
278  public function unwrap()
279  {
280  $address = $this->pop();
281  if (!$this->address()) {
282  $this->pop();
283  }
284 
285  return $address;
286  }
287 
293  public function __toString()
294  {
295  $string = "--------------------------------------" . PHP_EOL;
296  foreach ($this->_parts as $index => $part) {
297  $len = strlen($part);
298  if ($len == 17 && $part[0] == 0) {
299  $part = $this->s_encode_uuid($part);
300  $len = strlen($part);
301  }
302  $string .= sprintf ("[%03d] %s %s", $len, $part, PHP_EOL);
303  }
304 
305  return $string;
306  }
307 
314  public static function test()
315  {
316  $result = true;
317  $context = new ZMQContext();
318  $output = new ZMQSocket($context, ZMQ::SOCKET_DEALER);
319  $output->bind("inproc://zmsg_selftest");
320 
321  $input = new ZMQSocket($context, ZMQ::SOCKET_ROUTER);
322  $input->connect("inproc://zmsg_selftest");
323 
324  // Test send and receive of single-part message
325  $zmsgo = new Zmsg($output);
326  $zmsgo->body_set("Hello");
327  $result &= assert($zmsgo->body() == "Hello");
328  $zmsgo->send();
329 
330  $zmsgi = new Zmsg($input);
331  $zmsgi->recv();
332  $result &= assert($zmsgi->parts() == 2);
333  $result &= assert($zmsgi->body() == "Hello");
334  echo $zmsgi;
335 
336  // Test send and receive of multi-part message
337  $zmsgo = new Zmsg($output);
338  $zmsgo->body_set("Hello");
339  $zmsgo->wrap("address1", "");
340  $zmsgo->wrap("address2");
341  $result &= assert($zmsgo->parts() == 4);
342  echo $zmsgo;
343  $zmsgo->send();
344 
345  $zmsgi = new Zmsg($input);
346  $zmsgi->recv();
347  $result &= assert($zmsgi->parts() == 5);
348  $zmsgi->unwrap();
349  $result &= assert($zmsgi->unwrap() == "address2");
350 
351  $zmsgi->body_fmt("%s%s", 'W', "orld");
352  $result &= assert($zmsgi->body() == "World");
353 
354  // Pull off address 1, check that empty part was dropped
355  $zmsgi->unwrap();
356  $result &= assert($zmsgi->parts() == 1);
357 
358  // Check that message body was correctly modified
359  $part = $zmsgi->pop();
360  $result &= assert ($part == "World");
361  $result &= assert($zmsgi->parts() == 0);
362 
363  // Test load and save
364  $zmsg = new Zmsg();
365  $zmsg->body_set("Hello");
366  $zmsg->wrap("address1", "");
367  $zmsg->wrap("address2");
368  $result &= assert($zmsg->parts() == 4);
369  $fh = fopen(sys_get_temp_dir() . "/zmsgtest.zmsg", 'w');
370  $zmsg->save($fh);
371  fclose($fh);
372  $fh = fopen(sys_get_temp_dir() . "/zmsgtest.zmsg", 'r');
373  $zmsg2 = new Zmsg();
374  $zmsg2->load($fh);
375  assert($zmsg2->last() == $zmsg->last());
376  fclose($fh);
377  $result &= assert($zmsg2->parts() == 4);
378  echo ($result ? "OK" : "FAIL"), PHP_EOL;
379 
380  return $result;
381  }
382 
389  public function save($fh)
390  {
391  foreach ($this->_parts as $part) {
392  fwrite($fh, chr(strlen($part)));
393  if (strlen($part) > 0) {
394  fwrite($fh, $part);
395  }
396  }
397 
398  return $this;
399  }
400 
407  public function load($fh)
408  {
409  while (!feof($fh) && $size = fread($fh, 1)) {
410  $this->_parts[] = ord($size) > 0 ? fread($fh, ord($size)) : '';
411  }
412 
413  return $this;
414  }
415 }