56 $this->_socket = $socket;
68 return "@" . bin2hex($data);
79 return pack(
"H*", substr($data, 1));
90 $this->_socket = $socket;
105 if (!isset($this->_socket)) {
106 throw new Exception(
"No socket supplied");
108 $this->_parts = array();
110 $this->_parts[] = $this->_socket->recv();
111 if (!$this->_socket->getSockOpt(ZMQ::SOCKOPT_RCVMORE)) {
126 public function send($clear =
true)
128 if (!isset($this->_socket)) {
129 throw new Exception(
"No socket supplied");
131 $count = count($this->_parts);
133 foreach ($this->_parts as $part) {
134 $mode = $i++ == $count ? null : ZMQ::MODE_SNDMORE;
135 $this->_socket->send($part, $mode);
138 unset($this->_parts);
139 $this->_parts = array();
152 return count($this->_parts);
162 return $this->_parts[count($this->_parts)-1];
172 $this->_parts[count($this->_parts)-1] = $set;
182 return $this->_parts[count($this->_parts) -1];
193 $pos = count($this->_parts);
197 $this->_parts[$pos] = $body;
209 $args = func_get_args();
223 array_unshift($this->_parts, $part);
234 return array_shift($this->_parts);
245 $address = count($this->_parts) ? $this->_parts[0] : null;
247 return (strlen($address) == 17 && $address[0] == 0) ? $this->
s_encode_uuid($address) : $address;
258 public function wrap($address, $delim = null)
260 if ($delim !== null) {
263 if ($address[0] ==
'@' && strlen($address) == 33) {
266 $this->
push($address);
280 $address = $this->
pop();
295 $string =
"--------------------------------------" . PHP_EOL;
296 foreach ($this->_parts as $index => $part) {
297 $len = strlen($part);
298 if ($len == 17 && $part[0] == 0) {
300 $len = strlen($part);
302 $string .= sprintf (
"[%03d] %s %s", $len, $part, PHP_EOL);
317 $context =
new ZMQContext();
318 $output =
new ZMQSocket($context, ZMQ::SOCKET_DEALER);
319 $output->bind(
"inproc://zmsg_selftest");
321 $input =
new ZMQSocket($context, ZMQ::SOCKET_ROUTER);
322 $input->connect(
"inproc://zmsg_selftest");
325 $zmsgo =
new Zmsg($output);
326 $zmsgo->body_set(
"Hello");
327 $result &= assert($zmsgo->body() ==
"Hello");
330 $zmsgi =
new Zmsg($input);
332 $result &= assert($zmsgi->parts() == 2);
333 $result &= assert($zmsgi->body() ==
"Hello");
337 $zmsgo =
new Zmsg($output);
338 $zmsgo->body_set(
"Hello");
339 $zmsgo->wrap(
"address1",
"");
340 $zmsgo->wrap(
"address2");
341 $result &= assert($zmsgo->parts() == 4);
345 $zmsgi =
new Zmsg($input);
347 $result &= assert($zmsgi->parts() == 5);
349 $result &= assert($zmsgi->unwrap() ==
"address2");
351 $zmsgi->body_fmt(
"%s%s",
'W',
"orld");
352 $result &= assert($zmsgi->body() ==
"World");
356 $result &= assert($zmsgi->parts() == 1);
359 $part = $zmsgi->pop();
360 $result &= assert ($part ==
"World");
361 $result &= assert($zmsgi->parts() == 0);
365 $zmsg->body_set(
"Hello");
366 $zmsg->wrap(
"address1",
"");
367 $zmsg->wrap(
"address2");
368 $result &= assert($zmsg->parts() == 4);
369 $fh = fopen(sys_get_temp_dir() .
"/zmsgtest.zmsg",
'w');
372 $fh = fopen(sys_get_temp_dir() .
"/zmsgtest.zmsg",
'r');
375 assert($zmsg2->last() == $zmsg->last());
377 $result &= assert($zmsg2->parts() == 4);
378 echo ($result ?
"OK" :
"FAIL"), PHP_EOL;
391 foreach ($this->_parts as $part) {
392 fwrite($fh, chr(strlen($part)));
393 if (strlen($part) > 0) {
409 while (!feof($fh) && $size = fread($fh, 1)) {
410 $this->_parts[] = ord($size) > 0 ? fread($fh, ord($size)) :
'';