amqp_wire.inc 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698
  1. <?php
  2. /**
  3. * AMQP protocol serialization/deserialization to/from wire format.
  4. *
  5. * http://code.google.com/p/php-amqplib/
  6. * Vadim Zaliva <lord@crocodile.org>
  7. *
  8. *
  9. * To understand all signed/unsinged and 32/64 bit madness in this
  10. * code, please read first the following article:
  11. *
  12. * http://www.mysqlperformanceblog.com/2007/03/27/integers-in-php-running-with-scissors-and-portability/
  13. */
  14. require_once('hexdump.inc');
  15. /**
  16. * AMQP protocol decimal value.
  17. *
  18. * Values are represented as (n,e) pairs. The actual value
  19. * is n * 10^(-e).
  20. *
  21. * From 0.8 spec: Decimal values are
  22. * not intended to support floating point values, but rather
  23. * business values such as currency rates and amounts. The
  24. * 'decimals' octet is not signed.
  25. */
  26. class AMQPDecimal
  27. {
  28. public function __construct($n, $e)
  29. {
  30. if($e < 0)
  31. throw new Exception("Decimal exponent value must be unsigned!");
  32. $this->n = $n;
  33. $this->e = $e;
  34. }
  35. public function asBCvalue()
  36. {
  37. return bcdiv($this->n, bcpow(10,$this->e));
  38. }
  39. }
  40. class AMQPWriter
  41. {
  42. public function __construct()
  43. {
  44. $this->out = "";
  45. $this->bits = array();
  46. $this->bitcount = 0;
  47. }
  48. private static function chrbytesplit($x, $bytes)
  49. {
  50. return array_map('chr', AMQPWriter::bytesplit($x,$bytes));
  51. }
  52. /**
  53. * Splits number (could be either int or string) into array of byte
  54. * values (represented as integers) in big-endian byte order.
  55. */
  56. private static function bytesplit($x, $bytes)
  57. {
  58. if(is_int($x))
  59. {
  60. if($x<0)
  61. $x = sprintf("%u", $x);
  62. }
  63. $res = array();
  64. for($i=0;$i<$bytes;$i++)
  65. {
  66. $b = bcmod($x,'256');
  67. array_unshift($res,(int)$b);
  68. $x=bcdiv($x,'256', 0);
  69. }
  70. if($x!=0)
  71. throw new Exception("Value too big!");
  72. return $res;
  73. }
  74. private function flushbits()
  75. {
  76. if(count($this->bits))
  77. {
  78. $this->out .= implode("", array_map('chr',$this->bits));
  79. $this->bits = array();
  80. $this->bitcount = 0;
  81. }
  82. }
  83. /**
  84. * Get what's been encoded so far.
  85. */
  86. public function getvalue()
  87. {
  88. $this->flushbits();
  89. return $this->out;
  90. }
  91. /**
  92. * Write a plain Python string, with no special encoding.
  93. */
  94. public function write($s)
  95. {
  96. $this->flushbits();
  97. $this->out .= $s;
  98. }
  99. /**
  100. * Write a boolean value.
  101. */
  102. public function write_bit($b)
  103. {
  104. if($b)
  105. $b = 1;
  106. else
  107. $b = 0;
  108. $shift = $this->bitcount % 8;
  109. if($shift == 0)
  110. $last = 0;
  111. else
  112. $last = array_pop($this->bits);
  113. $last |= ($b << $shift);
  114. array_push($this->bits, $last);
  115. $this->bitcount += 1;
  116. }
  117. /**
  118. * Write an integer as an unsigned 8-bit value.
  119. */
  120. public function write_octet($n)
  121. {
  122. if($n < 0 || $n > 255)
  123. throw new Exception('Octet out of range 0..255');
  124. $this->flushbits();
  125. $this->out .= chr($n);
  126. }
  127. /**
  128. * Write an integer as an unsigned 16-bit value.
  129. */
  130. public function write_short($n)
  131. {
  132. if($n < 0 || $n > 65535)
  133. throw new Exception('Octet out of range 0..65535');
  134. $this->flushbits();
  135. $this->out .= pack('n', $n);
  136. }
  137. /**
  138. * Write an integer as an unsigned 32-bit value.
  139. */
  140. public function write_long($n)
  141. {
  142. $this->flushbits();
  143. $this->out .= implode("", AMQPWriter::chrbytesplit($n,4));
  144. }
  145. private function write_signed_long($n)
  146. {
  147. $this->flushbits();
  148. // although format spec for 'N' mentions unsigned
  149. // it will deal with sinned integers as well. tested.
  150. $this->out .= pack('N', $n);
  151. }
  152. /**
  153. * Write an integer as an unsigned 64-bit value.
  154. */
  155. public function write_longlong($n)
  156. {
  157. $this->flushbits();
  158. $this->out .= implode("", AMQPWriter::chrbytesplit($n,8));
  159. }
  160. /**
  161. * Write a string up to 255 bytes long after encoding.
  162. * Assume UTF-8 encoding.
  163. */
  164. public function write_shortstr($s)
  165. {
  166. $this->flushbits();
  167. if(strlen($s) > 255)
  168. throw new Exception('String too long');
  169. $this->write_octet(strlen($s));
  170. $this->out .= $s;
  171. }
  172. /*
  173. * Write a string up to 2**32 bytes long. Assume UTF-8 encoding.
  174. */
  175. public function write_longstr($s)
  176. {
  177. $this->flushbits();
  178. $this->write_long(strlen($s));
  179. $this->out .= $s;
  180. }
  181. /**
  182. * Write unix time_t value as 64 bit timestamp.
  183. */
  184. public function write_timestamp($v)
  185. {
  186. $this->write_longlong($v);
  187. }
  188. /**
  189. * Write PHP array, as table. Input array format: keys are strings,
  190. * values are (type,value) tuples.
  191. */
  192. public function write_table($d)
  193. {
  194. $this->flushbits();
  195. $table_data = new AMQPWriter();
  196. foreach($d as $k=>$va)
  197. {
  198. list($ftype,$v) = $va;
  199. $table_data->write_shortstr($k);
  200. if($ftype=='S')
  201. {
  202. $table_data->write('S');
  203. $table_data->write_longstr($v);
  204. } else if($ftype=='I')
  205. {
  206. $table_data->write('I');
  207. $table_data->write_signed_long($v);
  208. } else if($ftype=='D')
  209. {
  210. // 'D' type values are passed AMQPDecimal instances.
  211. $table_data->write('D');
  212. $table_data->write_octet($v->e);
  213. $table_data->write_signed_long($v->n);
  214. } else if($ftype=='T')
  215. {
  216. $table_data->write('T');
  217. $table_data->write_timestamp($v);
  218. } else if($ftype=='F')
  219. {
  220. $table_data->write('F');
  221. $table_data->write_table($v);
  222. }
  223. }
  224. $table_data = $table_data->getvalue();
  225. $this->write_long(strlen($table_data));
  226. $this->write($table_data);
  227. }
  228. }
  229. class AMQPReader
  230. {
  231. public function __construct($str, $sock=NULL)
  232. {
  233. $this->str = $str;
  234. if ($sock !== NULL)
  235. {
  236. $this->sock = new BufferedInput($sock);
  237. } else
  238. {
  239. $this->sock = NULL;
  240. }
  241. $this->offset = 0;
  242. $this->bitcount = $this->bits = 0;
  243. if(((int)4294967296)!=0)
  244. $this->is64bits = true;
  245. else
  246. $this->is64bits = false;
  247. if(!function_exists("bcmul"))
  248. throw new Exception("'bc math' module required");
  249. $this->buffer_read_timeout = 5; // in seconds
  250. }
  251. public function close()
  252. {
  253. if($this->sock)
  254. $this->sock->close();
  255. }
  256. public function read($n)
  257. {
  258. $this->bitcount = $this->bits = 0;
  259. return $this->rawread($n);
  260. }
  261. private function rawread($n)
  262. {
  263. if($this->sock)
  264. {
  265. $res = '';
  266. $read = 0;
  267. $start = time();
  268. while($read < $n && !feof($this->sock->real_sock()) &&
  269. (false !== ($buf = fread($this->sock->real_sock(), $n - $read))))
  270. {
  271. if ($buf == '')
  272. {
  273. usleep(100);
  274. }
  275. else
  276. $start = time();
  277. $read += strlen($buf);
  278. $res .= $buf;
  279. }
  280. if(strlen($res)!=$n)
  281. throw new Exception ("Error reading data. Recevived " .
  282. strlen($res) . " instead of expected $n bytes");
  283. $this->offset += $n;
  284. } else
  285. {
  286. if(strlen($this->str) < $n)
  287. throw new Exception ("Error reading data. Requested $n bytes while string buffer has only " .
  288. strlen($this->str));
  289. $res = substr($this->str,0,$n);
  290. $this->str = substr($this->str,$n);
  291. $this->offset += $n;
  292. }
  293. return $res;
  294. }
  295. public function read_bit()
  296. {
  297. if(!$this->bitcount)
  298. {
  299. $this->bits = ord($this->rawread(1));
  300. $this->bitcount = 8;
  301. }
  302. $result = ($this->bits & 1) == 1;
  303. $this->bits >>= 1;
  304. $this->bitcount -= 1;
  305. return $result;
  306. }
  307. public function read_octet()
  308. {
  309. $this->bitcount = $this->bits = 0;
  310. list(,$res) = unpack('C', $this->rawread(1));
  311. return $res;
  312. }
  313. public function read_short()
  314. {
  315. $this->bitcount = $this->bits = 0;
  316. list(,$res) = unpack('n', $this->rawread(2));
  317. return $res;
  318. }
  319. /**
  320. * Reads 32 bit integer in big-endian byte order.
  321. *
  322. * On 64 bit systems it will return always usngined int
  323. * value in 0..2^32 range.
  324. *
  325. * On 32 bit systems it will return signed int value in
  326. * -2^31...+2^31 range.
  327. *
  328. * Use with caution!
  329. */
  330. public function read_php_int()
  331. {
  332. list(,$res) = unpack('N', $this->rawread(4));
  333. if($this->is64bits)
  334. {
  335. $sres = sprintf ( "%u", $res );
  336. return (int)$sres;
  337. } else {
  338. return $res;
  339. }
  340. }
  341. // PHP does not have unsigned 32 bit int,
  342. // so we return it as a string
  343. public function read_long()
  344. {
  345. $this->bitcount = $this->bits = 0;
  346. list(,$res) = unpack('N', $this->rawread(4));
  347. $sres = sprintf ( "%u", $res );
  348. return $sres;
  349. }
  350. private function read_signed_long()
  351. {
  352. $this->bitcount = $this->bits = 0;
  353. // In PHP unpack('N') always return signed value,
  354. // on both 32 and 64 bit systems!
  355. list(,$res) = unpack('N', $this->rawread(4));
  356. return $res;
  357. }
  358. // Even on 64 bit systems PHP integers are singed.
  359. // Since we need an unsigned value here we return it
  360. // as a string.
  361. public function read_longlong()
  362. {
  363. $this->bitcount = $this->bits = 0;
  364. $hi = unpack('N', $this->rawread(4));
  365. $lo = unpack('N', $this->rawread(4));
  366. // workaround signed/unsigned braindamage in php
  367. $hi = sprintf ( "%u", $hi[1] );
  368. $lo = sprintf ( "%u", $lo[1] );
  369. return bcadd(bcmul($hi, "4294967296" ), $lo);
  370. }
  371. /**
  372. * Read a utf-8 encoded string that's stored in up to
  373. * 255 bytes. Return it decoded as a Python unicode object.
  374. */
  375. public function read_shortstr()
  376. {
  377. $this->bitcount = $this->bits = 0;
  378. list(,$slen) = unpack('C', $this->rawread(1));
  379. return $this->rawread($slen);
  380. }
  381. /**
  382. * Read a string that's up to 2**32 bytes, the encoding
  383. * isn't specified in the AMQP spec, so just return it as
  384. * a plain PHP string.
  385. */
  386. public function read_longstr()
  387. {
  388. $this->bitcount = $this->bits = 0;
  389. $slen = $this->read_php_int();
  390. if($slen<0)
  391. throw new Exception("Strings longer than supported on this platform");
  392. return $this->rawread($slen);
  393. }
  394. /**
  395. * Read and AMQP timestamp, which is a 64-bit integer representing
  396. * seconds since the Unix epoch in 1-second resolution.
  397. */
  398. function read_timestamp()
  399. {
  400. return $this->read_longlong();
  401. }
  402. /**
  403. * Read an AMQP table, and return as a PHP array. keys are strings,
  404. * values are (type,value) tuples.
  405. */
  406. public function read_table()
  407. {
  408. $this->bitcount = $this->bits = 0;
  409. $tlen = $this->read_php_int();
  410. if($tlen<0)
  411. throw new Exception("Table is longer than supported");
  412. $table_data = new AMQPReader($this->rawread($tlen));
  413. $result = array();
  414. while($table_data->tell() < $tlen)
  415. {
  416. $name = $table_data->read_shortstr();
  417. $ftype = $table_data->rawread(1);
  418. if($ftype == 'S') {
  419. $val = $table_data->read_longstr();
  420. } else if($ftype == 'I') {
  421. $val = $table_data->read_signed_long();
  422. } else if($ftype == 'D')
  423. {
  424. $e = $table_data->read_octet();
  425. $n = $table_data->read_signed_long();
  426. $val = new AMQPDecimal($n, $e);
  427. } else if($ftype == 'T')
  428. {
  429. $val = $table_data->read_timestamp();
  430. } else if($ftype == 'F')
  431. {
  432. $val = $table_data->read_table(); // recursion
  433. } else {
  434. error_log("Usupported table field type $ftype");
  435. $val = NULL;
  436. }
  437. $result[$name] = array($ftype,$val);
  438. }
  439. return $result;
  440. }
  441. protected function tell()
  442. {
  443. return $this->offset;
  444. }
  445. }
  446. /**
  447. * Abstract base class for AMQP content. Subclasses should override
  448. * the PROPERTIES attribute.
  449. */
  450. class GenericContent
  451. {
  452. protected static $PROPERTIES = array(
  453. "dummy" => "shortstr"
  454. );
  455. public function __construct($props, $prop_types=NULL)
  456. {
  457. if($prop_types)
  458. $this->prop_types = $prop_types;
  459. else
  460. $this->prop_types = GenericContent::$PROPERTIES;
  461. $d = array();
  462. if ($props)
  463. $d = array_intersect_key($props, $this->prop_types);
  464. else
  465. $d = array();
  466. $this->properties = $d;
  467. }
  468. /**
  469. * Look for additional properties in the 'properties' dictionary,
  470. * and if present - the 'delivery_info' dictionary.
  471. */
  472. public function get($name)
  473. {
  474. if(array_key_exists($name,$this->properties))
  475. return $this->properties[$name];
  476. if(isset($this->delivery_info))
  477. if(array_key_exists($name,$this->delivery_info))
  478. return $this->delivery_info[$name];
  479. throw new Exception("No such property");
  480. }
  481. /**
  482. * Given the raw bytes containing the property-flags and
  483. * property-list from a content-frame-header, parse and insert
  484. * into a dictionary stored in this object as an attribute named
  485. * 'properties'.
  486. */
  487. public function load_properties($raw_bytes)
  488. {
  489. $r = new AMQPReader($raw_bytes);
  490. // Read 16-bit shorts until we get one with a low bit set to zero
  491. $flags = array();
  492. while(true)
  493. {
  494. $flag_bits = $r->read_short();
  495. array_push($flags, $flag_bits);
  496. if(($flag_bits & 1) == 0)
  497. break;
  498. }
  499. $shift = 0;
  500. $d = array();
  501. foreach ($this->prop_types as $key => $proptype)
  502. {
  503. if($shift == 0) {
  504. if(!$flags) {
  505. break;
  506. }
  507. $flag_bits = array_shift($flags);
  508. $shift = 15;
  509. }
  510. if($flag_bits & (1 << $shift))
  511. $d[$key] = call_user_func(array($r,"read_".$proptype));
  512. $shift -= 1;
  513. }
  514. $this->properties = $d;
  515. }
  516. /**
  517. * serialize the 'properties' attribute (a dictionary) into the
  518. * raw bytes making up a set of property flags and a property
  519. * list, suitable for putting into a content frame header.
  520. */
  521. public function serialize_properties()
  522. {
  523. $shift = 15;
  524. $flag_bits = 0;
  525. $flags = array();
  526. $raw_bytes = new AMQPWriter();
  527. foreach ($this->prop_types as $key => $proptype)
  528. {
  529. if(array_key_exists($key,$this->properties))
  530. $val = $this->properties[$key];
  531. else
  532. $val = NULL;
  533. if($val != NULL)
  534. {
  535. if($shift == 0)
  536. {
  537. array_push($flags, $flag_bits);
  538. $flag_bits = 0;
  539. $shift = 15;
  540. }
  541. $flag_bits |= (1 << $shift);
  542. if($proptype != "bit")
  543. call_user_func(array($raw_bytes, "write_" . $proptype),
  544. $val);
  545. }
  546. $shift -= 1;
  547. }
  548. array_push($flags, $flag_bits);
  549. $result = new AMQPWriter();
  550. foreach($flags as $flag_bits)
  551. $result->write_short($flag_bits);
  552. $result->write($raw_bytes->getvalue());
  553. return $result->getvalue();
  554. }
  555. }
  556. class BufferedInput
  557. {
  558. public function __construct($sock)
  559. {
  560. $this->block_size = 8192;
  561. $this->sock = $sock;
  562. $this->reset("");
  563. }
  564. public function real_sock()
  565. {
  566. return $this->sock;
  567. }
  568. public function read($n)
  569. {
  570. if ($this->offset >= strlen($this->buffer))
  571. {
  572. if (!($rv = $this->populate_buffer()))
  573. {
  574. return $rv;
  575. }
  576. }
  577. return $this->read_buffer($n);
  578. }
  579. public function close()
  580. {
  581. fclose($this->sock);
  582. $this->reset("");
  583. }
  584. private function read_buffer($n)
  585. {
  586. $n = min($n, strlen($this->buffer) - $this->offset);
  587. if ($n === 0)
  588. {
  589. // substr("", 0, 0) => FALSE, which screws up read loops that are
  590. // expecting non-blocking reads to return "". This avoids that edge
  591. // case when the buffer is empty/used up.
  592. return "";
  593. }
  594. $block = substr($this->buffer, $this->offset, $n);
  595. $this->offset += $n;
  596. return $block;
  597. }
  598. private function reset($block)
  599. {
  600. $this->buffer = $block;
  601. $this->offset = 0;
  602. }
  603. private function populate_buffer()
  604. {
  605. if(feof($this->sock))
  606. {
  607. $this->reset("");
  608. return FALSE;
  609. }
  610. $block = fread($this->sock, $this->block_size);
  611. if ($block !== FALSE)
  612. {
  613. $this->reset($block);
  614. return TRUE;
  615. } else
  616. {
  617. return $block;
  618. }
  619. }
  620. }
  621. ?>