123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698 |
- <?php
- /**
- * AMQP protocol serialization/deserialization to/from wire format.
- *
- * http://code.google.com/p/php-amqplib/
- * Vadim Zaliva <lord@crocodile.org>
- *
- *
- * To understand all signed/unsinged and 32/64 bit madness in this
- * code, please read first the following article:
- *
- * http://www.mysqlperformanceblog.com/2007/03/27/integers-in-php-running-with-scissors-and-portability/
- */
- require_once('hexdump.inc');
- /**
- * AMQP protocol decimal value.
- *
- * Values are represented as (n,e) pairs. The actual value
- * is n * 10^(-e).
- *
- * From 0.8 spec: Decimal values are
- * not intended to support floating point values, but rather
- * business values such as currency rates and amounts. The
- * 'decimals' octet is not signed.
- */
- class AMQPDecimal
- {
- public function __construct($n, $e)
- {
- if($e < 0)
- throw new Exception("Decimal exponent value must be unsigned!");
- $this->n = $n;
- $this->e = $e;
- }
- public function asBCvalue()
- {
- return bcdiv($this->n, bcpow(10,$this->e));
- }
- }
- class AMQPWriter
- {
- public function __construct()
- {
- $this->out = "";
- $this->bits = array();
- $this->bitcount = 0;
- }
- private static function chrbytesplit($x, $bytes)
- {
- return array_map('chr', AMQPWriter::bytesplit($x,$bytes));
- }
- /**
- * Splits number (could be either int or string) into array of byte
- * values (represented as integers) in big-endian byte order.
- */
- private static function bytesplit($x, $bytes)
- {
- if(is_int($x))
- {
- if($x<0)
- $x = sprintf("%u", $x);
- }
-
- $res = array();
- for($i=0;$i<$bytes;$i++)
- {
- $b = bcmod($x,'256');
- array_unshift($res,(int)$b);
- $x=bcdiv($x,'256', 0);
- }
- if($x!=0)
- throw new Exception("Value too big!");
- return $res;
- }
-
- private function flushbits()
- {
- if(count($this->bits))
- {
- $this->out .= implode("", array_map('chr',$this->bits));
- $this->bits = array();
- $this->bitcount = 0;
- }
- }
- /**
- * Get what's been encoded so far.
- */
- public function getvalue()
- {
- $this->flushbits();
- return $this->out;
- }
- /**
- * Write a plain Python string, with no special encoding.
- */
- public function write($s)
- {
- $this->flushbits();
- $this->out .= $s;
- }
- /**
- * Write a boolean value.
- */
- public function write_bit($b)
- {
- if($b)
- $b = 1;
- else
- $b = 0;
- $shift = $this->bitcount % 8;
- if($shift == 0)
- $last = 0;
- else
- $last = array_pop($this->bits);
-
- $last |= ($b << $shift);
- array_push($this->bits, $last);
-
- $this->bitcount += 1;
- }
- /**
- * Write an integer as an unsigned 8-bit value.
- */
- public function write_octet($n)
- {
- if($n < 0 || $n > 255)
- throw new Exception('Octet out of range 0..255');
- $this->flushbits();
- $this->out .= chr($n);
- }
- /**
- * Write an integer as an unsigned 16-bit value.
- */
- public function write_short($n)
- {
- if($n < 0 || $n > 65535)
- throw new Exception('Octet out of range 0..65535');
- $this->flushbits();
- $this->out .= pack('n', $n);
- }
- /**
- * Write an integer as an unsigned 32-bit value.
- */
- public function write_long($n)
- {
- $this->flushbits();
- $this->out .= implode("", AMQPWriter::chrbytesplit($n,4));
- }
- private function write_signed_long($n)
- {
- $this->flushbits();
- // although format spec for 'N' mentions unsigned
- // it will deal with sinned integers as well. tested.
- $this->out .= pack('N', $n);
- }
- /**
- * Write an integer as an unsigned 64-bit value.
- */
- public function write_longlong($n)
- {
- $this->flushbits();
- $this->out .= implode("", AMQPWriter::chrbytesplit($n,8));
- }
- /**
- * Write a string up to 255 bytes long after encoding.
- * Assume UTF-8 encoding.
- */
- public function write_shortstr($s)
- {
- $this->flushbits();
- if(strlen($s) > 255)
- throw new Exception('String too long');
- $this->write_octet(strlen($s));
- $this->out .= $s;
- }
- /*
- * Write a string up to 2**32 bytes long. Assume UTF-8 encoding.
- */
- public function write_longstr($s)
- {
- $this->flushbits();
- $this->write_long(strlen($s));
- $this->out .= $s;
- }
- /**
- * Write unix time_t value as 64 bit timestamp.
- */
- public function write_timestamp($v)
- {
- $this->write_longlong($v);
- }
- /**
- * Write PHP array, as table. Input array format: keys are strings,
- * values are (type,value) tuples.
- */
- public function write_table($d)
- {
- $this->flushbits();
- $table_data = new AMQPWriter();
- foreach($d as $k=>$va)
- {
- list($ftype,$v) = $va;
- $table_data->write_shortstr($k);
- if($ftype=='S')
- {
- $table_data->write('S');
- $table_data->write_longstr($v);
- } else if($ftype=='I')
- {
- $table_data->write('I');
- $table_data->write_signed_long($v);
- } else if($ftype=='D')
- {
- // 'D' type values are passed AMQPDecimal instances.
- $table_data->write('D');
- $table_data->write_octet($v->e);
- $table_data->write_signed_long($v->n);
- } else if($ftype=='T')
- {
- $table_data->write('T');
- $table_data->write_timestamp($v);
- } else if($ftype=='F')
- {
- $table_data->write('F');
- $table_data->write_table($v);
- }
- }
- $table_data = $table_data->getvalue();
- $this->write_long(strlen($table_data));
- $this->write($table_data);
- }
- }
- class AMQPReader
- {
- public function __construct($str, $sock=NULL)
- {
- $this->str = $str;
- if ($sock !== NULL)
- {
- $this->sock = new BufferedInput($sock);
- } else
- {
- $this->sock = NULL;
- }
- $this->offset = 0;
- $this->bitcount = $this->bits = 0;
- if(((int)4294967296)!=0)
- $this->is64bits = true;
- else
- $this->is64bits = false;
- if(!function_exists("bcmul"))
- throw new Exception("'bc math' module required");
-
- $this->buffer_read_timeout = 5; // in seconds
- }
- public function close()
- {
- if($this->sock)
- $this->sock->close();
- }
- public function read($n)
- {
- $this->bitcount = $this->bits = 0;
- return $this->rawread($n);
- }
-
- private function rawread($n)
- {
- if($this->sock)
- {
- $res = '';
- $read = 0;
-
- $start = time();
- while($read < $n && !feof($this->sock->real_sock()) &&
- (false !== ($buf = fread($this->sock->real_sock(), $n - $read))))
- {
- if ($buf == '')
- {
- usleep(100);
- }
- else
- $start = time();
-
- $read += strlen($buf);
- $res .= $buf;
- }
- if(strlen($res)!=$n)
- throw new Exception ("Error reading data. Recevived " .
- strlen($res) . " instead of expected $n bytes");
- $this->offset += $n;
- } else
- {
- if(strlen($this->str) < $n)
- throw new Exception ("Error reading data. Requested $n bytes while string buffer has only " .
- strlen($this->str));
- $res = substr($this->str,0,$n);
- $this->str = substr($this->str,$n);
- $this->offset += $n;
- }
- return $res;
- }
- public function read_bit()
- {
- if(!$this->bitcount)
- {
- $this->bits = ord($this->rawread(1));
- $this->bitcount = 8;
- }
- $result = ($this->bits & 1) == 1;
- $this->bits >>= 1;
- $this->bitcount -= 1;
- return $result;
- }
- public function read_octet()
- {
- $this->bitcount = $this->bits = 0;
- list(,$res) = unpack('C', $this->rawread(1));
- return $res;
- }
- public function read_short()
- {
- $this->bitcount = $this->bits = 0;
- list(,$res) = unpack('n', $this->rawread(2));
- return $res;
- }
- /**
- * Reads 32 bit integer in big-endian byte order.
- *
- * On 64 bit systems it will return always usngined int
- * value in 0..2^32 range.
- *
- * On 32 bit systems it will return signed int value in
- * -2^31...+2^31 range.
- *
- * Use with caution!
- */
- public function read_php_int()
- {
- list(,$res) = unpack('N', $this->rawread(4));
- if($this->is64bits)
- {
- $sres = sprintf ( "%u", $res );
- return (int)$sres;
- } else {
- return $res;
- }
- }
-
- // PHP does not have unsigned 32 bit int,
- // so we return it as a string
- public function read_long()
- {
- $this->bitcount = $this->bits = 0;
- list(,$res) = unpack('N', $this->rawread(4));
- $sres = sprintf ( "%u", $res );
- return $sres;
- }
- private function read_signed_long()
- {
- $this->bitcount = $this->bits = 0;
- // In PHP unpack('N') always return signed value,
- // on both 32 and 64 bit systems!
- list(,$res) = unpack('N', $this->rawread(4));
- return $res;
- }
- // Even on 64 bit systems PHP integers are singed.
- // Since we need an unsigned value here we return it
- // as a string.
- public function read_longlong()
- {
- $this->bitcount = $this->bits = 0;
- $hi = unpack('N', $this->rawread(4));
- $lo = unpack('N', $this->rawread(4));
- // workaround signed/unsigned braindamage in php
- $hi = sprintf ( "%u", $hi[1] );
- $lo = sprintf ( "%u", $lo[1] );
-
- return bcadd(bcmul($hi, "4294967296" ), $lo);
- }
- /**
- * Read a utf-8 encoded string that's stored in up to
- * 255 bytes. Return it decoded as a Python unicode object.
- */
- public function read_shortstr()
- {
- $this->bitcount = $this->bits = 0;
- list(,$slen) = unpack('C', $this->rawread(1));
- return $this->rawread($slen);
- }
-
- /**
- * Read a string that's up to 2**32 bytes, the encoding
- * isn't specified in the AMQP spec, so just return it as
- * a plain PHP string.
- */
- public function read_longstr()
- {
- $this->bitcount = $this->bits = 0;
- $slen = $this->read_php_int();
- if($slen<0)
- throw new Exception("Strings longer than supported on this platform");
- return $this->rawread($slen);
- }
-
- /**
- * Read and AMQP timestamp, which is a 64-bit integer representing
- * seconds since the Unix epoch in 1-second resolution.
- */
- function read_timestamp()
- {
- return $this->read_longlong();
- }
- /**
- * Read an AMQP table, and return as a PHP array. keys are strings,
- * values are (type,value) tuples.
- */
- public function read_table()
- {
- $this->bitcount = $this->bits = 0;
- $tlen = $this->read_php_int();
- if($tlen<0)
- throw new Exception("Table is longer than supported");
- $table_data = new AMQPReader($this->rawread($tlen));
- $result = array();
- while($table_data->tell() < $tlen)
- {
- $name = $table_data->read_shortstr();
- $ftype = $table_data->rawread(1);
- if($ftype == 'S') {
- $val = $table_data->read_longstr();
- } else if($ftype == 'I') {
- $val = $table_data->read_signed_long();
- } else if($ftype == 'D')
- {
- $e = $table_data->read_octet();
- $n = $table_data->read_signed_long();
- $val = new AMQPDecimal($n, $e);
- } else if($ftype == 'T')
- {
- $val = $table_data->read_timestamp();
- } else if($ftype == 'F')
- {
- $val = $table_data->read_table(); // recursion
- } else {
- error_log("Usupported table field type $ftype");
- $val = NULL;
- }
- $result[$name] = array($ftype,$val);
- }
- return $result;
- }
-
- protected function tell()
- {
- return $this->offset;
- }
-
- }
- /**
- * Abstract base class for AMQP content. Subclasses should override
- * the PROPERTIES attribute.
- */
- class GenericContent
- {
- protected static $PROPERTIES = array(
- "dummy" => "shortstr"
- );
- public function __construct($props, $prop_types=NULL)
- {
- if($prop_types)
- $this->prop_types = $prop_types;
- else
- $this->prop_types = GenericContent::$PROPERTIES;
- $d = array();
- if ($props)
- $d = array_intersect_key($props, $this->prop_types);
- else
- $d = array();
- $this->properties = $d;
- }
- /**
- * Look for additional properties in the 'properties' dictionary,
- * and if present - the 'delivery_info' dictionary.
- */
- public function get($name)
- {
- if(array_key_exists($name,$this->properties))
- return $this->properties[$name];
-
- if(isset($this->delivery_info))
- if(array_key_exists($name,$this->delivery_info))
- return $this->delivery_info[$name];
-
- throw new Exception("No such property");
- }
- /**
- * Given the raw bytes containing the property-flags and
- * property-list from a content-frame-header, parse and insert
- * into a dictionary stored in this object as an attribute named
- * 'properties'.
- */
- public function load_properties($raw_bytes)
- {
- $r = new AMQPReader($raw_bytes);
- // Read 16-bit shorts until we get one with a low bit set to zero
- $flags = array();
- while(true)
- {
- $flag_bits = $r->read_short();
- array_push($flags, $flag_bits);
- if(($flag_bits & 1) == 0)
- break;
- }
- $shift = 0;
- $d = array();
- foreach ($this->prop_types as $key => $proptype)
- {
- if($shift == 0) {
- if(!$flags) {
- break;
- }
- $flag_bits = array_shift($flags);
- $shift = 15;
- }
- if($flag_bits & (1 << $shift))
- $d[$key] = call_user_func(array($r,"read_".$proptype));
- $shift -= 1;
- }
- $this->properties = $d;
- }
- /**
- * serialize the 'properties' attribute (a dictionary) into the
- * raw bytes making up a set of property flags and a property
- * list, suitable for putting into a content frame header.
- */
- public function serialize_properties()
- {
- $shift = 15;
- $flag_bits = 0;
- $flags = array();
- $raw_bytes = new AMQPWriter();
- foreach ($this->prop_types as $key => $proptype)
- {
- if(array_key_exists($key,$this->properties))
- $val = $this->properties[$key];
- else
- $val = NULL;
- if($val != NULL)
- {
- if($shift == 0)
- {
- array_push($flags, $flag_bits);
- $flag_bits = 0;
- $shift = 15;
- }
-
- $flag_bits |= (1 << $shift);
- if($proptype != "bit")
- call_user_func(array($raw_bytes, "write_" . $proptype),
- $val);
- }
- $shift -= 1;
- }
- array_push($flags, $flag_bits);
- $result = new AMQPWriter();
- foreach($flags as $flag_bits)
- $result->write_short($flag_bits);
- $result->write($raw_bytes->getvalue());
-
- return $result->getvalue();
- }
- }
- class BufferedInput
- {
- public function __construct($sock)
- {
- $this->block_size = 8192;
- $this->sock = $sock;
- $this->reset("");
-
- }
-
- public function real_sock()
- {
- return $this->sock;
- }
-
- public function read($n)
- {
- if ($this->offset >= strlen($this->buffer))
- {
- if (!($rv = $this->populate_buffer()))
- {
- return $rv;
- }
- }
- return $this->read_buffer($n);
- }
-
- public function close()
- {
- fclose($this->sock);
- $this->reset("");
- }
-
- private function read_buffer($n)
- {
- $n = min($n, strlen($this->buffer) - $this->offset);
- if ($n === 0)
- {
- // substr("", 0, 0) => FALSE, which screws up read loops that are
- // expecting non-blocking reads to return "". This avoids that edge
- // case when the buffer is empty/used up.
- return "";
- }
- $block = substr($this->buffer, $this->offset, $n);
- $this->offset += $n;
- return $block;
- }
-
- private function reset($block)
- {
- $this->buffer = $block;
- $this->offset = 0;
- }
-
- private function populate_buffer()
- {
- if(feof($this->sock))
- {
- $this->reset("");
- return FALSE;
- }
-
- $block = fread($this->sock, $this->block_size);
- if ($block !== FALSE)
- {
- $this->reset($block);
- return TRUE;
- } else
- {
- return $block;
- }
- }
- }
- ?>
|