amqp.inc 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573
  1. <?php
  2. /**
  3. * Simple AMQP client library for AMQP for protocol version 0.8
  4. *
  5. * http://code.google.com/p/php-amqplib/
  6. * Vadim Zaliva <lord@crocodile.org>
  7. *
  8. */
  9. require_once('amqp_wire.inc');
  10. require_once('hexdump.inc');
  11. function debug_msg($s)
  12. {
  13. echo $s, "\n";
  14. }
  15. function methodSig($a)
  16. {
  17. if(is_string($a))
  18. return $a;
  19. else
  20. return sprintf("%d,%d",$a[0] ,$a[1]);
  21. }
  22. class AMQPException extends Exception
  23. {
  24. public function __construct($reply_code, $reply_text, $method_sig)
  25. {
  26. parent::__construct($reply_text,$reply_code);
  27. $this->amqp_reply_code = $reply_code; // redundant, but kept for BC
  28. $this->amqp_reply_text = $reply_text; // redundant, but kept for BC
  29. $this->amqp_method_sig = $method_sig;
  30. $ms=methodSig($method_sig);
  31. if(array_key_exists($ms, AbstractChannel::$GLOBAL_METHOD_NAMES))
  32. $mn = AbstractChannel::$GLOBAL_METHOD_NAMES[$ms];
  33. else
  34. $mn = "";
  35. $this->args = array(
  36. $reply_code,
  37. $reply_text,
  38. $method_sig,
  39. $mn
  40. );
  41. }
  42. }
  43. class AMQPConnectionException extends AMQPException
  44. {
  45. public function __construct($reply_code, $reply_text, $method_sig)
  46. {
  47. parent::__construct($reply_code, $reply_text, $method_sig);
  48. }
  49. }
  50. class AMQPChannelException extends AMQPException
  51. {
  52. public function __construct($reply_code, $reply_text, $method_sig)
  53. {
  54. parent::__construct($reply_code, $reply_text, $method_sig);
  55. }
  56. }
  57. class AbstractChannel
  58. {
  59. private static $CONTENT_METHODS = array(
  60. "60,60", // Basic.deliver
  61. "60,71", // Basic.get_ok
  62. );
  63. private static $CLOSE_METHODS = array(
  64. "10,60", // Connection.close
  65. "20,40", // Channel.close
  66. );
  67. // All the method names
  68. public static $GLOBAL_METHOD_NAMES = array(
  69. "10,10" => "Connection.start",
  70. "10,11" => "Connection.start_ok",
  71. "10,20" => "Connection.secure",
  72. "10,21" => "Connection.secure_ok",
  73. "10,30" => "Connection.tune",
  74. "10,31" => "Connection.tune_ok",
  75. "10,40" => "Connection.open",
  76. "10,41" => "Connection.open_ok",
  77. "10,50" => "Connection.redirect",
  78. "10,60" => "Connection.close",
  79. "10,61" => "Connection.close_ok",
  80. "20,10" => "Channel.open",
  81. "20,11" => "Channel.open_ok",
  82. "20,20" => "Channel.flow",
  83. "20,21" => "Channel.flow_ok",
  84. "20,30" => "Channel.alert",
  85. "20,40" => "Channel.close",
  86. "20,41" => "Channel.close_ok",
  87. "30,10" => "Channel.access_request",
  88. "30,11" => "Channel.access_request_ok",
  89. "40,10" => "Channel.exchange_declare",
  90. "40,11" => "Channel.exchange_declare_ok",
  91. "40,20" => "Channel.exchange_delete",
  92. "40,21" => "Channel.exchange_delete_ok",
  93. "50,10" => "Channel.queue_declare",
  94. "50,11" => "Channel.queue_declare_ok",
  95. "50,20" => "Channel.queue_bind",
  96. "50,21" => "Channel.queue_bind_ok",
  97. "50,30" => "Channel.queue_purge",
  98. "50,31" => "Channel.queue_purge_ok",
  99. "50,40" => "Channel.queue_delete",
  100. "50,41" => "Channel.queue_delete_ok",
  101. "50,50" => "Channel.queue_unbind",
  102. "50,51" => "Channel.queue_unbind_ok",
  103. "60,10" => "Channel.basic_qos",
  104. "60,11" => "Channel.basic_qos_ok",
  105. "60,20" => "Channel.basic_consume",
  106. "60,21" => "Channel.basic_consume_ok",
  107. "60,30" => "Channel.basic_cancel",
  108. "60,31" => "Channel.basic_cancel_ok",
  109. "60,40" => "Channel.basic_publish",
  110. "60,50" => "Channel.basic_return",
  111. "60,60" => "Channel.basic_deliver",
  112. "60,70" => "Channel.basic_get",
  113. "60,71" => "Channel.basic_get_ok",
  114. "60,72" => "Channel.basic_get_empty",
  115. "60,80" => "Channel.basic_ack",
  116. "60,90" => "Channel.basic_reject",
  117. "60,100" => "Channel.basic_recover",
  118. "90,10" => "Channel.tx_select",
  119. "90,11" => "Channel.tx_select_ok",
  120. "90,20" => "Channel.tx_commit",
  121. "90,21" => "Channel.tx_commit_ok",
  122. "90,30" => "Channel.tx_rollback",
  123. "90,31" => "Channel.tx_rollback_ok"
  124. );
  125. protected $debug;
  126. public function __construct($connection, $channel_id)
  127. {
  128. $this->connection = $connection;
  129. $this->channel_id = $channel_id;
  130. $connection->channels[$channel_id] = $this;
  131. $this->frame_queue = array(); // Lower level queue for frames
  132. $this->method_queue = array(); // Higher level queue for methods
  133. $this->auto_decode = false;
  134. $this->debug = defined('AMQP_DEBUG') ? AMQP_DEBUG : false;
  135. }
  136. public function getChannelId()
  137. {
  138. return $this->channel_id;
  139. }
  140. function dispatch($method_sig, $args, $content)
  141. {
  142. if(!array_key_exists($method_sig, $this->method_map))
  143. throw new Exception("Unknown AMQP method $method_sig");
  144. $amqp_method = $this->method_map[$method_sig];
  145. if($content == NULL)
  146. return call_user_func(array($this,$amqp_method), $args);
  147. else
  148. return call_user_func(array($this,$amqp_method), $args, $content);
  149. }
  150. function next_frame()
  151. {
  152. if($this->debug)
  153. {
  154. debug_msg("waiting for a new frame");
  155. }
  156. if($this->frame_queue != NULL)
  157. return array_pop($this->frame_queue);
  158. return $this->connection->wait_channel($this->channel_id);
  159. }
  160. protected function send_method_frame($method_sig, $args="")
  161. {
  162. $this->connection->send_channel_method_frame($this->channel_id, $method_sig, $args);
  163. }
  164. function wait_content()
  165. {
  166. $frm = $this->next_frame();
  167. $frame_type = $frm[0];
  168. $payload = $frm[1];
  169. if($frame_type != 2)
  170. throw new Exception("Expecting Content header");
  171. $payload_reader = new AMQPReader(substr($payload,0,12));
  172. $class_id = $payload_reader->read_short();
  173. $weight = $payload_reader->read_short();
  174. $body_size = $payload_reader->read_longlong();
  175. $msg = new AMQPMessage();
  176. $msg->load_properties(substr($payload,12));
  177. $body_parts = array();
  178. $body_received = 0;
  179. while(bccomp($body_size,$body_received)==1)
  180. {
  181. $frm = $this->next_frame();
  182. $frame_type = $frm[0];
  183. $payload = $frm[1];
  184. if($frame_type != 3)
  185. throw new Exception("Expecting Content body, received frame type $frame_type");
  186. $body_parts[] = $payload;
  187. $body_received = bcadd($body_received, strlen($payload));
  188. }
  189. $msg->body = implode("",$body_parts);
  190. if($this->auto_decode and isset($msg->content_encoding))
  191. {
  192. try
  193. {
  194. $msg->body = $msg->body->decode($msg->content_encoding);
  195. } catch (Exception $e) {
  196. if($this->debug)
  197. {
  198. debug_msg("Ignoring body decoding exception: " . $e->getMessage());
  199. }
  200. }
  201. }
  202. return $msg;
  203. }
  204. /**
  205. * Wait for some expected AMQP methods and dispatch to them.
  206. * Unexpected methods are queued up for later calls to this Python
  207. * method.
  208. */
  209. public function wait($allowed_methods=NULL)
  210. {
  211. if($allowed_methods)
  212. {
  213. if($this->debug)
  214. {
  215. debug_msg("waiting for " . implode(", ", $allowed_methods));
  216. }
  217. }
  218. else
  219. {
  220. if($this->debug)
  221. {
  222. debug_msg("waiting for any method");
  223. }
  224. }
  225. //Process deferred methods
  226. foreach($this->method_queue as $qk=>$queued_method)
  227. {
  228. if($this->debug)
  229. {
  230. debug_msg("checking queue method " . $qk);
  231. }
  232. $method_sig = $queued_method[0];
  233. if($allowed_methods==NULL || in_array($method_sig, $allowed_methods))
  234. {
  235. unset($this->method_queue[$qk]);
  236. if($this->debug)
  237. {
  238. debug_msg("Executing queued method: $method_sig: " . AbstractChannel::$GLOBAL_METHOD_NAMES[methodSig($method_sig)]);
  239. }
  240. return $this->dispatch($queued_method[0],
  241. $queued_method[1],
  242. $queued_method[2]);
  243. }
  244. }
  245. // No deferred methods? wait for new ones
  246. while(true)
  247. {
  248. $frm = $this->next_frame();
  249. $frame_type = $frm[0];
  250. $payload = $frm[1];
  251. if($frame_type != 1)
  252. throw new Exception("Expecting AMQP method, received frame type: $frame_type");
  253. if(strlen($payload) < 4)
  254. throw new Exception("Method frame too short");
  255. $method_sig_array = unpack("n2", substr($payload,0,4));
  256. $method_sig = "" . $method_sig_array[1] . "," . $method_sig_array[2];
  257. $args = new AMQPReader(substr($payload,4));
  258. if($this->debug)
  259. {
  260. debug_msg("> $method_sig: " . AbstractChannel::$GLOBAL_METHOD_NAMES[methodSig($method_sig)]);
  261. }
  262. if(in_array($method_sig, AbstractChannel::$CONTENT_METHODS))
  263. $content = $this->wait_content();
  264. else
  265. $content = NULL;
  266. if($allowed_methods==NULL ||
  267. in_array($method_sig,$allowed_methods) ||
  268. in_array($method_sig,AbstractChannel::$CLOSE_METHODS))
  269. {
  270. return $this->dispatch($method_sig, $args, $content);
  271. }
  272. // Wasn't what we were looking for? save it for later
  273. if($this->debug)
  274. {
  275. debug_msg("Queueing for later: $method_sig: " . AbstractChannel::$GLOBAL_METHOD_NAMES[methodSig($method_sig)]);
  276. }
  277. array_push($this->method_queue,array($method_sig, $args, $content));
  278. }
  279. }
  280. }
  281. class AMQPConnection extends AbstractChannel
  282. {
  283. public static $AMQP_PROTOCOL_HEADER = "AMQP\x01\x01\x09\x01";
  284. public static $LIBRARY_PROPERTIES = array(
  285. "library" => array('S', "PHP Simple AMQP lib"),
  286. "library_version" => array('S', "0.1")
  287. );
  288. protected $method_map = array(
  289. "10,10" => "start",
  290. "10,20" => "secure",
  291. "10,30" => "tune",
  292. "10,41" => "open_ok",
  293. "10,50" => "redirect",
  294. "10,60" => "_close",
  295. "10,61" => "close_ok"
  296. );
  297. public function __construct($host, $port,
  298. $user, $password,
  299. $vhost="/",$insist=false,
  300. $login_method="AMQPLAIN",
  301. $login_response=NULL,
  302. $locale="en_US",
  303. $connection_timeout = 3,
  304. $read_write_timeout = 3)
  305. {
  306. if($user && $password)
  307. {
  308. $login_response = new AMQPWriter();
  309. $login_response->write_table(array("LOGIN" => array('S',$user),
  310. "PASSWORD" => array('S',$password)));
  311. $login_response = substr($login_response->getvalue(),4); //Skip the length
  312. } else
  313. $login_response = NULL;
  314. $d = AMQPConnection::$LIBRARY_PROPERTIES;
  315. while(true)
  316. {
  317. $this->channels = array();
  318. // The connection object itself is treated as channel 0
  319. parent::__construct($this, 0);
  320. $this->channel_max = 65535;
  321. $this->frame_max = 131072;
  322. $errstr = $errno = NULL;
  323. $this->sock = NULL;
  324. if (!($this->sock = fsockopen($host,$port,$errno,$errstr,$connection_timeout)))
  325. {
  326. throw new Exception ("Error Connecting to server($errno): $errstr ");
  327. }
  328. stream_set_timeout($this->sock, $read_write_timeout);
  329. stream_set_blocking($this->sock, 1);
  330. $this->input = new AMQPReader(null, $this->sock);
  331. $this->write(AMQPConnection::$AMQP_PROTOCOL_HEADER);
  332. $this->wait(array("10,10"));
  333. $this->x_start_ok($d, $login_method, $login_response, $locale);
  334. $this->wait_tune_ok = true;
  335. while($this->wait_tune_ok)
  336. {
  337. $this->wait(array(
  338. "10,20", // secure
  339. "10,30", // tune
  340. ));
  341. }
  342. $host = $this->x_open($vhost,"", $insist);
  343. if(!$host)
  344. return; // we weren't redirected
  345. // we were redirected, close the socket, loop and try again
  346. if($this->debug)
  347. {
  348. debug_msg("closing socket");
  349. }
  350. @fclose($this->sock); $this->sock=NULL;
  351. }
  352. }
  353. public function __destruct()
  354. {
  355. if(isset($this->input))
  356. if($this->input)
  357. $this->close();
  358. if($this->sock)
  359. {
  360. if($this->debug)
  361. {
  362. debug_msg("closing socket");
  363. }
  364. @fclose($this->sock);
  365. }
  366. }
  367. protected function write($data)
  368. {
  369. if($this->debug)
  370. {
  371. debug_msg("< [hex]:\n" . hexdump($data, $htmloutput = false, $uppercase = true, $return = true));
  372. }
  373. $len = strlen($data);
  374. while(true)
  375. {
  376. $written = fwrite($this->sock, $data);
  377. if($written == false || $written <= 0)
  378. {
  379. throw new Exception ("Error sending data");
  380. }
  381. $len = $len - $written;
  382. if($len>0)
  383. $data=substr($data,0-$len);
  384. else
  385. break;
  386. }
  387. }
  388. protected function do_close()
  389. {
  390. if(isset($this->input))
  391. if($this->input)
  392. {
  393. $this->input->close();
  394. $this->input = NULL;
  395. }
  396. if($this->sock)
  397. {
  398. if($this->debug)
  399. {
  400. debug_msg("closing socket");
  401. }
  402. @fclose($this->sock);
  403. $this->sock = NULL;
  404. }
  405. }
  406. public function get_free_channel_id()
  407. {
  408. for($i=1;$i<=$this->channel_max;$i++)
  409. if(!array_key_exists($i,$this->channels))
  410. return $i;
  411. throw new Exception("No free channel ids");
  412. }
  413. public function send_content($channel, $class_id, $weight, $body_size,
  414. $packed_properties, $body)
  415. {
  416. $pkt = new AMQPWriter();
  417. $pkt->write_octet(2);
  418. $pkt->write_short($channel);
  419. $pkt->write_long(strlen($packed_properties)+12);
  420. $pkt->write_short($class_id);
  421. $pkt->write_short($weight);
  422. $pkt->write_longlong($body_size);
  423. $pkt->write($packed_properties);
  424. $pkt->write_octet(0xCE);
  425. $pkt = $pkt->getvalue();
  426. $this->write($pkt);
  427. while($body)
  428. {
  429. $payload = substr($body,0, $this->frame_max-8);
  430. $body = substr($body,$this->frame_max-8);
  431. $pkt = new AMQPWriter();
  432. $pkt->write_octet(3);
  433. $pkt->write_short($channel);
  434. $pkt->write_long(strlen($payload));
  435. $pkt->write($payload);
  436. $pkt->write_octet(0xCE);
  437. $pkt = $pkt->getvalue();
  438. $this->write($pkt);
  439. }
  440. }
  441. protected function send_channel_method_frame($channel, $method_sig, $args="")
  442. {
  443. if($args instanceof AMQPWriter)
  444. $args = $args->getvalue();
  445. $pkt = new AMQPWriter();
  446. $pkt->write_octet(1);
  447. $pkt->write_short($channel);
  448. $pkt->write_long(strlen($args)+4); // 4 = length of class_id and method_id
  449. // in payload
  450. $pkt->write_short($method_sig[0]); // class_id
  451. $pkt->write_short($method_sig[1]); // method_id
  452. $pkt->write($args);
  453. $pkt->write_octet(0xCE);
  454. $pkt = $pkt->getvalue();
  455. $this->write($pkt);
  456. if($this->debug)
  457. {
  458. debug_msg("< " . methodSig($method_sig) . ": " . AbstractChannel::$GLOBAL_METHOD_NAMES[methodSig($method_sig)]);
  459. }
  460. }
  461. /**
  462. * Wait for a frame from the server
  463. */
  464. protected function wait_frame()
  465. {
  466. $frame_type = $this->input->read_octet();
  467. $channel = $this->input->read_short();
  468. $size = $this->input->read_long();
  469. $payload = $this->input->read($size);
  470. $ch = $this->input->read_octet();
  471. if($ch != 0xCE)
  472. throw new Exception(sprintf("Framing error, unexpected byte: %x", $ch));
  473. return array($frame_type, $channel, $payload);
  474. }
  475. /**
  476. * Wait for a frame from the server destined for
  477. * a particular channel.
  478. */
  479. protected function wait_channel($channel_id)
  480. {
  481. while(true)
  482. {
  483. list($frame_type, $frame_channel, $payload) = $this->wait_frame();
  484. if($frame_channel == $channel_id)
  485. return array($frame_type, $payload);
  486. // Not the channel we were looking for. Queue this frame
  487. //for later, when the other channel is looking for frames.
  488. array_push($this->channels[$frame_channel]->frame_queue,
  489. array($frame_type, $payload));
  490. // If we just queued up a method for channel 0 (the Connection
  491. // itself) it's probably a close method in reaction to some
  492. // error, so deal with it right away.
  493. if(($frame_type == 1) && ($frame_channel == 0))
  494. $this->wait();
  495. }
  496. }
  497. /**
  498. * Fetch a Channel object identified by the numeric channel_id, or
  499. * create that object if it doesn't already exist.
  500. */
  501. public function channel($channel_id=NULL)
  502. {
  503. if(array_key_exists($channel_id,$this->channels))
  504. return $this->channels[$channel_id];
  505. return new AMQPChannel($this->connection, $channel_id);
  506. }
  507. /**
  508. * request a connection close
  509. */
  510. public function close($reply_code=0, $reply_text="", $method_sig=array(0, 0))
  511. {
  512. try {
  513. $args = new AMQPWriter();
  514. $args->write_short($reply_code);
  515. $args->write_shortstr($reply_text);
  516. $args->write_short($method_sig[0]); // class_id
  517. $args->write_short($method_sig[1]); // method_id
  518. $this->send_method_frame(array(10, 60), $args);
  519. } catch(Exception $e) {
  520. return;
  521. }
  522. return $this->wait(array(
  523. "10,61", // Connection.close_ok
  524. ));
  525. }
  526. public static function dump_table($table)
  527. {
  528. $tokens = array();
  529. foreach ($table as $name => $value)
  530. {
  531. switch ($value[0])
  532. {
  533. case 'D':
  534. $val = $value[1]->n . 'E' . $value[1]->e;
  535. break;
  536. case 'F':
  537. $val = '(' . self::dump_table($value[1]) . ')';
  538. case 'T':
  539. $val = date('Y-m-d H:i:s', $value[1]);
  540. break;
  541. default:
  542. $val = $value[1];
  543. }
  544. $tokens[] = $name . '=' . $val;
  545. }
  546. return implode(', ', $tokens);
  547. }
  548. protected function _close($args)
  549. {
  550. $reply_code = $args->read_short();
  551. $reply_text = $args->read_shortstr();
  552. $class_id = $args->read_short();
  553. $method_id = $args->read_short();
  554. $this->x_close_ok();
  555. throw new AMQPConnectionException($reply_code, $reply_text, array($class_id, $method_id));
  556. }
  557. /**
  558. * confirm a connection close
  559. */
  560. protected function x_close_ok()
  561. {
  562. $this->send_method_frame(array(10, 61));
  563. $this->do_close();
  564. }
  565. /**
  566. * confirm a connection close
  567. */
  568. protected function close_ok($args)
  569. {
  570. $this->do_close();
  571. }
  572. protected function x_open($virtual_host, $capabilities="", $insist=false)
  573. {
  574. $args = new AMQPWriter();
  575. $args->write_shortstr($virtual_host);
  576. $args->write_shortstr($capabilities);
  577. $args->write_bit($insist);
  578. $this->send_method_frame(array(10, 40), $args);
  579. return $this->wait(array(
  580. "10,41", // Connection.open_ok
  581. "10,50" // Connection.redirect
  582. ));
  583. }
  584. /**
  585. * signal that the connection is ready
  586. */
  587. protected function open_ok($args)
  588. {
  589. $this->known_hosts = $args->read_shortstr();
  590. if($this->debug)
  591. {
  592. debug_msg("Open OK! known_hosts: " . $this->known_hosts);
  593. }
  594. return NULL;
  595. }
  596. /**
  597. * asks the client to use a different server
  598. */
  599. protected function redirect($args)
  600. {
  601. $host = $args->read_shortstr();
  602. $this->known_hosts = $args->read_shortstr();
  603. if($this->debug)
  604. {
  605. debug_msg("Redirected to [". $host . "], known_hosts [" . $this->known_hosts . "]" );
  606. }
  607. return $host;
  608. }
  609. /**
  610. * security mechanism challenge
  611. */
  612. protected function secure($args)
  613. {
  614. $challenge = $args->read_longstr();
  615. }
  616. /**
  617. * security mechanism response
  618. */
  619. protected function x_secure_ok($response)
  620. {
  621. $args = new AMQPWriter();
  622. $args->write_longstr($response);
  623. $this->send_method_frame(array(10, 21), $args);
  624. }
  625. /**
  626. * start connection negotiation
  627. */
  628. protected function start($args)
  629. {
  630. $this->version_major = $args->read_octet();
  631. $this->version_minor = $args->read_octet();
  632. $this->server_properties = $args->read_table();
  633. $this->mechanisms = explode(" ", $args->read_longstr());
  634. $this->locales = explode(" ", $args->read_longstr());
  635. if($this->debug)
  636. {
  637. debug_msg(sprintf("Start from server, version: %d.%d, properties: %s, mechanisms: %s, locales: %s",
  638. $this->version_major,
  639. $this->version_minor,
  640. self::dump_table($this->server_properties),
  641. implode(', ', $this->mechanisms),
  642. implode(', ', $this->locales)));
  643. }
  644. }
  645. protected function x_start_ok($client_properties, $mechanism, $response, $locale)
  646. {
  647. $args = new AMQPWriter();
  648. $args->write_table($client_properties);
  649. $args->write_shortstr($mechanism);
  650. $args->write_longstr($response);
  651. $args->write_shortstr($locale);
  652. $this->send_method_frame(array(10, 11), $args);
  653. }
  654. /**
  655. * propose connection tuning parameters
  656. */
  657. protected function tune($args)
  658. {
  659. $v=$args->read_short();
  660. if($v)
  661. $this->channel_max = $v;
  662. $v=$args->read_long();
  663. if($v)
  664. $this->frame_max = $v;
  665. $this->heartbeat = $args->read_short();
  666. $this->x_tune_ok($this->channel_max, $this->frame_max, 0);
  667. }
  668. /**
  669. * negotiate connection tuning parameters
  670. */
  671. protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
  672. {
  673. $args = new AMQPWriter();
  674. $args->write_short($channel_max);
  675. $args->write_long($frame_max);
  676. $args->write_short($heartbeat);
  677. $this->send_method_frame(array(10, 31), $args);
  678. $this->wait_tune_ok = False;
  679. }
  680. }
  681. class AMQPChannel extends AbstractChannel
  682. {
  683. protected $method_map = array(
  684. "20,11" => "open_ok",
  685. "20,20" => "flow",
  686. "20,21" => "flow_ok",
  687. "20,30" => "alert",
  688. "20,40" => "_close",
  689. "20,41" => "close_ok",
  690. "30,11" => "access_request_ok",
  691. "40,11" => "exchange_declare_ok",
  692. "40,21" => "exchange_delete_ok",
  693. "50,11" => "queue_declare_ok",
  694. "50,21" => "queue_bind_ok",
  695. "50,31" => "queue_purge_ok",
  696. "50,41" => "queue_delete_ok",
  697. "50,51" => "queue_unbind_ok",
  698. "60,11" => "basic_qos_ok",
  699. "60,21" => "basic_consume_ok",
  700. "60,31" => "basic_cancel_ok",
  701. "60,50" => "basic_return",
  702. "60,60" => "basic_deliver",
  703. "60,71" => "basic_get_ok",
  704. "60,72" => "basic_get_empty",
  705. "90,11" => "tx_select_ok",
  706. "90,21" => "tx_commit_ok",
  707. "90,31" => "tx_rollback_ok"
  708. );
  709. public function __construct($connection,
  710. $channel_id=NULL,
  711. $auto_decode=true)
  712. {
  713. if($channel_id == NULL)
  714. $channel_id = $connection->get_free_channel_id();
  715. parent::__construct($connection, $channel_id);
  716. if($this->debug)
  717. {
  718. debug_msg("using channel_id: " . $channel_id);
  719. }
  720. $this->default_ticket = 0;
  721. $this->is_open = false;
  722. $this->active = true; // Flow control
  723. $this->alerts = array();
  724. $this->callbacks = array();
  725. $this->auto_decode = $auto_decode;
  726. $this->x_open();
  727. }
  728. public function __destruct()
  729. {
  730. //TODO:???if($this->connection)
  731. // $this->close("destroying channel");
  732. }
  733. /**
  734. * Tear down this object, after we've agreed to close with the server.
  735. */
  736. protected function do_close()
  737. {
  738. $this->is_open = false;
  739. unset($this->connection->channels[$this->channel_id]);
  740. $this->channel_id = $this->connection = NULL;
  741. }
  742. /**
  743. * This method allows the server to send a non-fatal warning to
  744. * the client. This is used for methods that are normally
  745. * asynchronous and thus do not have confirmations, and for which
  746. * the server may detect errors that need to be reported. Fatal
  747. * errors are handled as channel or connection exceptions; non-
  748. * fatal errors are sent through this method.
  749. */
  750. protected function alert($args)
  751. {
  752. $reply_code = $args->read_short();
  753. $reply_text = $args->read_shortstr();
  754. $details = $args->read_table();
  755. array_push($this->alerts,array($reply_code, $reply_text, $details));
  756. }
  757. /**
  758. * request a channel close
  759. */
  760. public function close($reply_code=0,
  761. $reply_text="",
  762. $method_sig=array(0, 0))
  763. {
  764. $args = new AMQPWriter();
  765. $args->write_short($reply_code);
  766. $args->write_shortstr($reply_text);
  767. $args->write_short($method_sig[0]); // class_id
  768. $args->write_short($method_sig[1]); // method_id
  769. $this->send_method_frame(array(20, 40), $args);
  770. return $this->wait(array(
  771. "20,41" // Channel.close_ok
  772. ));
  773. }
  774. protected function _close($args)
  775. {
  776. $reply_code = $args->read_short();
  777. $reply_text = $args->read_shortstr();
  778. $class_id = $args->read_short();
  779. $method_id = $args->read_short();
  780. $this->send_method_frame(array(20, 41));
  781. $this->do_close();
  782. throw new AMQPChannelException($reply_code, $reply_text,
  783. array($class_id, $method_id));
  784. }
  785. /**
  786. * confirm a channel close
  787. */
  788. protected function close_ok($args)
  789. {
  790. $this->do_close();
  791. }
  792. /**
  793. * enable/disable flow from peer
  794. */
  795. public function flow($active)
  796. {
  797. $args = new AMQPWriter();
  798. $args->write_bit($active);
  799. $this->send_method_frame(array(20, 20), $args);
  800. return $this->wait(array(
  801. "20,21" //Channel.flow_ok
  802. ));
  803. }
  804. protected function _flow($args)
  805. {
  806. $this->active = $args->read_bit();
  807. $this->x_flow_ok($this->active);
  808. }
  809. protected function x_flow_ok($active)
  810. {
  811. $args = new AMQPWriter();
  812. $args->write_bit($active);
  813. $this->send_method_frame(array(20, 21), $args);
  814. }
  815. protected function flow_ok($args)
  816. {
  817. return $args->read_bit();
  818. }
  819. protected function x_open($out_of_band="")
  820. {
  821. if($this->is_open)
  822. return;
  823. $args = new AMQPWriter();
  824. $args->write_shortstr($out_of_band);
  825. $this->send_method_frame(array(20, 10), $args);
  826. return $this->wait(array(
  827. "20,11" //Channel.open_ok
  828. ));
  829. }
  830. protected function open_ok($args)
  831. {
  832. $this->is_open = true;
  833. if($this->debug)
  834. {
  835. debug_msg("Channel open");
  836. }
  837. }
  838. /**
  839. * request an access ticket
  840. */
  841. public function access_request($realm, $exclusive=false,
  842. $passive=false, $active=false, $write=false, $read=false)
  843. {
  844. $args = new AMQPWriter();
  845. $args->write_shortstr($realm);
  846. $args->write_bit($exclusive);
  847. $args->write_bit($passive);
  848. $args->write_bit($active);
  849. $args->write_bit($write);
  850. $args->write_bit($read);
  851. $this->send_method_frame(array(30, 10), $args);
  852. return $this->wait(array(
  853. "30,11" //Channel.access_request_ok
  854. ));
  855. }
  856. /**
  857. * grant access to server resources
  858. */
  859. protected function access_request_ok($args)
  860. {
  861. $this->default_ticket = $args->read_short();
  862. return $this->default_ticket;
  863. }
  864. /**
  865. * declare exchange, create if needed
  866. */
  867. public function exchange_declare($exchange,
  868. $type,
  869. $passive=false,
  870. $durable=false,
  871. $auto_delete=true,
  872. $internal=false,
  873. $nowait=false,
  874. $arguments=NULL,
  875. $ticket=NULL)
  876. {
  877. if($arguments==NULL)
  878. $arguments = array();
  879. $args = new AMQPWriter();
  880. if($ticket != NULL)
  881. $args->write_short($ticket);
  882. else
  883. $args->write_short($this->default_ticket);
  884. $args->write_shortstr($exchange);
  885. $args->write_shortstr($type);
  886. $args->write_bit($passive);
  887. $args->write_bit($durable);
  888. $args->write_bit($auto_delete);
  889. $args->write_bit($internal);
  890. $args->write_bit($nowait);
  891. $args->write_table($arguments);
  892. $this->send_method_frame(array(40, 10), $args);
  893. if(!$nowait)
  894. return $this->wait(array(
  895. "40,11" //Channel.exchange_declare_ok
  896. ));
  897. }
  898. /**
  899. * confirms an exchange declaration
  900. */
  901. protected function exchange_declare_ok($args)
  902. {
  903. }
  904. /**
  905. * delete an exchange
  906. */
  907. public function exchange_delete($exchange, $if_unused=false,
  908. $nowait=false, $ticket=NULL)
  909. {
  910. $args = new AMQPWriter();
  911. if($ticket != NULL)
  912. $args->write_short($ticket);
  913. else
  914. $args->write_short($this->default_ticket);
  915. $args->write_shortstr($exchange);
  916. $args->write_bit($if_unused);
  917. $args->write_bit($nowait);
  918. $this->send_method_frame(array(40, 20), $args);
  919. if(!$nowait)
  920. return $this->wait(array(
  921. "40,21" //Channel.exchange_delete_ok
  922. ));
  923. }
  924. /**
  925. * confirm deletion of an exchange
  926. */
  927. protected function exchange_delete_ok($args)
  928. {
  929. }
  930. /**
  931. * bind queue to an exchange
  932. */
  933. public function queue_bind($queue, $exchange, $routing_key="",
  934. $nowait=false, $arguments=NULL, $ticket=NULL)
  935. {
  936. if($arguments == NULL)
  937. $arguments = array();
  938. $args = new AMQPWriter();
  939. if($ticket != NULL)
  940. $args->write_short($ticket);
  941. else
  942. $args->write_short($this->default_ticket);
  943. $args->write_shortstr($queue);
  944. $args->write_shortstr($exchange);
  945. $args->write_shortstr($routing_key);
  946. $args->write_bit($nowait);
  947. $args->write_table($arguments);
  948. $this->send_method_frame(array(50, 20), $args);
  949. if(!$nowait)
  950. return $this->wait(array(
  951. "50,21" // Channel.queue_bind_ok
  952. ));
  953. }
  954. /**
  955. * confirm bind successful
  956. */
  957. protected function queue_bind_ok($args)
  958. {
  959. }
  960. /**
  961. * unbind queue from an exchange
  962. */
  963. public function queue_unbind($queue, $exchange, $routing_key="",
  964. $arguments=NULL, $ticket=NULL)
  965. {
  966. if($arguments == NULL)
  967. $arguments = array();
  968. $args = new AMQPWriter();
  969. if($ticket != NULL)
  970. $args->write_short($ticket);
  971. else
  972. $args->write_short($this->default_ticket);
  973. $args->write_shortstr($queue);
  974. $args->write_shortstr($exchange);
  975. $args->write_shortstr($routing_key);
  976. $args->write_table($arguments);
  977. $this->send_method_frame(array(50, 50), $args);
  978. return $this->wait(array(
  979. "50,51" // Channel.queue_unbind_ok
  980. ));
  981. }
  982. /**
  983. * confirm unbind successful
  984. */
  985. protected function queue_unbind_ok($args)
  986. {
  987. }
  988. /**
  989. * declare queue, create if needed
  990. */
  991. public function queue_declare($queue="",
  992. $passive=false,
  993. $durable=false,
  994. $exclusive=false,
  995. $auto_delete=true,
  996. $nowait=false,
  997. $arguments=NULL,
  998. $ticket=NULL)
  999. {
  1000. if($arguments == NULL)
  1001. $arguments = array();
  1002. $args = new AMQPWriter();
  1003. if($ticket != NULL)
  1004. $args->write_short($ticket);
  1005. else
  1006. $args->write_short($this->default_ticket);
  1007. $args->write_shortstr($queue);
  1008. $args->write_bit($passive);
  1009. $args->write_bit($durable);
  1010. $args->write_bit($exclusive);
  1011. $args->write_bit($auto_delete);
  1012. $args->write_bit($nowait);
  1013. $args->write_table($arguments);
  1014. $this->send_method_frame(array(50, 10), $args);
  1015. if(!$nowait)
  1016. return $this->wait(array(
  1017. "50,11" // Channel.queue_declare_ok
  1018. ));
  1019. }
  1020. /**
  1021. * confirms a queue definition
  1022. */
  1023. protected function queue_declare_ok($args)
  1024. {
  1025. $queue = $args->read_shortstr();
  1026. $message_count = $args->read_long();
  1027. $consumer_count = $args->read_long();
  1028. return array($queue, $message_count, $consumer_count);
  1029. }
  1030. /**
  1031. * delete a queue
  1032. */
  1033. public function queue_delete($queue="", $if_unused=false, $if_empty=false,
  1034. $nowait=false, $ticket=NULL)
  1035. {
  1036. $args = new AMQPWriter();
  1037. if($ticket != NULL)
  1038. $args->write_short($ticket);
  1039. else
  1040. $args->write_short($this->default_ticket);
  1041. $args->write_shortstr($queue);
  1042. $args->write_bit($if_unused);
  1043. $args->write_bit($if_empty);
  1044. $args->write_bit($nowait);
  1045. $this->send_method_frame(array(50, 40), $args);
  1046. if(!$nowait)
  1047. return $this->wait(array(
  1048. "50,41" //Channel.queue_delete_ok
  1049. ));
  1050. }
  1051. /**
  1052. * confirm deletion of a queue
  1053. */
  1054. protected function queue_delete_ok($args)
  1055. {
  1056. return $args->read_long();
  1057. }
  1058. /**
  1059. * purge a queue
  1060. */
  1061. public function queue_purge($queue="", $nowait=false, $ticket=NULL)
  1062. {
  1063. $args = new AMQPWriter();
  1064. if($ticket != NULL)
  1065. $args->write_short($ticket);
  1066. else
  1067. $args->write_short($this->default_ticket);
  1068. $args->write_shortstr($queue);
  1069. $args->write_bit($nowait);
  1070. $this->send_method_frame(array(50, 30), $args);
  1071. if(!$nowait)
  1072. return $this->wait(array(
  1073. "50,31" //Channel.queue_purge_ok
  1074. ));
  1075. }
  1076. /**
  1077. * confirms a queue purge
  1078. */
  1079. protected function queue_purge_ok($args)
  1080. {
  1081. return $args->read_long();
  1082. }
  1083. /**
  1084. * acknowledge one or more messages
  1085. */
  1086. public function basic_ack($delivery_tag, $multiple=false)
  1087. {
  1088. $args = new AMQPWriter();
  1089. $args->write_longlong($delivery_tag);
  1090. $args->write_bit($multiple);
  1091. $this->send_method_frame(array(60, 80), $args);
  1092. }
  1093. /**
  1094. * end a queue consumer
  1095. */
  1096. public function basic_cancel($consumer_tag, $nowait=false)
  1097. {
  1098. $args = new AMQPWriter();
  1099. $args->write_shortstr($consumer_tag);
  1100. $args->write_bit($nowait);
  1101. $this->send_method_frame(array(60, 30), $args);
  1102. return $this->wait(array(
  1103. "60,31" // Channel.basic_cancel_ok
  1104. ));
  1105. }
  1106. /**
  1107. * confirm a cancelled consumer
  1108. */
  1109. protected function basic_cancel_ok($args)
  1110. {
  1111. $consumer_tag = $args->read_shortstr();
  1112. unset($this->callbacks[$consumer_tag]);
  1113. }
  1114. /**
  1115. * start a queue consumer
  1116. */
  1117. public function basic_consume($queue="", $consumer_tag="", $no_local=false,
  1118. $no_ack=false, $exclusive=false, $nowait=false,
  1119. $callback=NULL, $ticket=NULL)
  1120. {
  1121. $args = new AMQPWriter();
  1122. if($ticket != NULL)
  1123. $args->write_short($ticket);
  1124. else
  1125. $args->write_short($this->default_ticket);
  1126. $args->write_shortstr($queue);
  1127. $args->write_shortstr($consumer_tag);
  1128. $args->write_bit($no_local);
  1129. $args->write_bit($no_ack);
  1130. $args->write_bit($exclusive);
  1131. $args->write_bit($nowait);
  1132. $this->send_method_frame(array(60, 20), $args);
  1133. if(!$nowait)
  1134. $consumer_tag = $this->wait(array(
  1135. "60,21" //Channel.basic_consume_ok
  1136. ));
  1137. $this->callbacks[$consumer_tag] = $callback;
  1138. return $consumer_tag;
  1139. }
  1140. /**
  1141. * confirm a new consumer
  1142. */
  1143. protected function basic_consume_ok($args)
  1144. {
  1145. return $args->read_shortstr();
  1146. }
  1147. /**
  1148. * notify the client of a consumer message
  1149. */
  1150. protected function basic_deliver($args, $msg)
  1151. {
  1152. $consumer_tag = $args->read_shortstr();
  1153. $delivery_tag = $args->read_longlong();
  1154. $redelivered = $args->read_bit();
  1155. $exchange = $args->read_shortstr();
  1156. $routing_key = $args->read_shortstr();
  1157. $msg->delivery_info = array(
  1158. "channel" => $this,
  1159. "consumer_tag" => $consumer_tag,
  1160. "delivery_tag" => $delivery_tag,
  1161. "redelivered" => $redelivered,
  1162. "exchange" => $exchange,
  1163. "routing_key" => $routing_key
  1164. );
  1165. if(array_key_exists($consumer_tag, $this->callbacks))
  1166. $func = $this->callbacks[$consumer_tag];
  1167. else
  1168. $func = NULL;
  1169. if($func!=NULL)
  1170. call_user_func($func, $msg);
  1171. }
  1172. /**
  1173. * direct access to a queue
  1174. */
  1175. public function basic_get($queue="", $no_ack=false, $ticket=NULL)
  1176. {
  1177. $args = new AMQPWriter();
  1178. if($ticket != NULL)
  1179. $args->write_short($ticket);
  1180. else
  1181. $args->write_short($this->default_ticket);
  1182. $args->write_shortstr($queue);
  1183. $args->write_bit($no_ack);
  1184. $this->send_method_frame(array(60, 70), $args);
  1185. return $this->wait(array(
  1186. "60,71", //Channel.basic_get_ok
  1187. "60,72" // Channel.basic_get_empty
  1188. ));
  1189. }
  1190. /**
  1191. * indicate no messages available
  1192. */
  1193. protected function basic_get_empty($args)
  1194. {
  1195. $cluster_id = $args->read_shortstr();
  1196. }
  1197. /**
  1198. * provide client with a message
  1199. */
  1200. protected function basic_get_ok($args, $msg)
  1201. {
  1202. $delivery_tag = $args->read_longlong();
  1203. $redelivered = $args->read_bit();
  1204. $exchange = $args->read_shortstr();
  1205. $routing_key = $args->read_shortstr();
  1206. $message_count = $args->read_long();
  1207. $msg->delivery_info = array(
  1208. "delivery_tag" => $delivery_tag,
  1209. "redelivered" => $redelivered,
  1210. "exchange" => $exchange,
  1211. "routing_key" => $routing_key,
  1212. "message_count" => $message_count
  1213. );
  1214. return $msg;
  1215. }
  1216. /**
  1217. * publish a message
  1218. */
  1219. public function basic_publish($msg, $exchange="", $routing_key="",
  1220. $mandatory=false, $immediate=false,
  1221. $ticket=NULL)
  1222. {
  1223. $args = new AMQPWriter();
  1224. if($ticket != NULL)
  1225. $args->write_short($ticket);
  1226. else
  1227. $args->write_short($this->default_ticket);
  1228. $args->write_shortstr($exchange);
  1229. $args->write_shortstr($routing_key);
  1230. $args->write_bit($mandatory);
  1231. $args->write_bit($immediate);
  1232. $this->send_method_frame(array(60, 40), $args);
  1233. $this->connection->send_content($this->channel_id, 60, 0,
  1234. strlen($msg->body),
  1235. $msg->serialize_properties(),
  1236. $msg->body);
  1237. }
  1238. /**
  1239. * specify quality of service
  1240. */
  1241. public function basic_qos($prefetch_size, $prefetch_count, $a_global)
  1242. {
  1243. $args = new AMQPWriter();
  1244. $args->write_long($prefetch_size);
  1245. $args->write_short($prefetch_count);
  1246. $args->write_bit($a_global);
  1247. $this->send_method_frame(array(60, 10), $args);
  1248. return $this->wait(array(
  1249. "60,11" //Channel.basic_qos_ok
  1250. ));
  1251. }
  1252. /**
  1253. * confirm the requested qos
  1254. */
  1255. protected function basic_qos_ok($args)
  1256. {
  1257. }
  1258. /**
  1259. * redeliver unacknowledged messages
  1260. */
  1261. public function basic_recover($requeue=false)
  1262. {
  1263. $args = new AMQPWriter();
  1264. $args->write_bit($requeue);
  1265. $this->send_method_frame(array(60, 100), $args);
  1266. }
  1267. /**
  1268. * reject an incoming message
  1269. */
  1270. public function basic_reject($delivery_tag, $requeue)
  1271. {
  1272. $args = new AMQPWriter();
  1273. $args->write_longlong($delivery_tag);
  1274. $args->write_bit($requeue);
  1275. $this->send_method_frame(array(60, 90), $args);
  1276. }
  1277. /**
  1278. * return a failed message
  1279. */
  1280. protected function basic_return($args)
  1281. {
  1282. $reply_code = $args->read_short();
  1283. $reply_text = $args->read_shortstr();
  1284. $exchange = $args->read_shortstr();
  1285. $routing_key = $args->read_shortstr();
  1286. $msg = $this->wait();
  1287. }
  1288. public function tx_commit()
  1289. {
  1290. $this->send_method_frame(array(90, 20));
  1291. return $this->wait(array(
  1292. "90,21" //Channel.tx_commit_ok
  1293. ));
  1294. }
  1295. /**
  1296. * confirm a successful commit
  1297. */
  1298. protected function tx_commit_ok($args)
  1299. {
  1300. }
  1301. /**
  1302. * abandon the current transaction
  1303. */
  1304. public function tx_rollback()
  1305. {
  1306. $this->send_method_frame(array(90, 30));
  1307. return $this->wait(array(
  1308. "90,31" //Channel.tx_rollback_ok
  1309. ));
  1310. }
  1311. /**
  1312. * confirm a successful rollback
  1313. */
  1314. protected function tx_rollback_ok($args)
  1315. {
  1316. }
  1317. /**
  1318. * select standard transaction mode
  1319. */
  1320. public function tx_select()
  1321. {
  1322. $this->send_method_frame(array(90, 10));
  1323. return $this->wait(array(
  1324. "90,11" //Channel.tx_select_ok
  1325. ));
  1326. }
  1327. /**
  1328. * confirm transaction mode
  1329. */
  1330. protected function tx_select_ok($args)
  1331. {
  1332. }
  1333. }
  1334. /**
  1335. * A Message for use with the Channnel.basic_* methods.
  1336. */
  1337. class AMQPMessage extends GenericContent
  1338. {
  1339. protected static $PROPERTIES = array(
  1340. "content_type" => "shortstr",
  1341. "content_encoding" => "shortstr",
  1342. "application_headers" => "table",
  1343. "delivery_mode" => "octet",
  1344. "priority" => "octet",
  1345. "correlation_id" => "shortstr",
  1346. "reply_to" => "shortstr",
  1347. "expiration" => "shortstr",
  1348. "message_id" => "shortstr",
  1349. "timestamp" => "timestamp",
  1350. "type" => "shortstr",
  1351. "user_id" => "shortstr",
  1352. "app_id" => "shortstr",
  1353. "cluster_id" => "shortst"
  1354. );
  1355. public function __construct($body = '', $properties = null)
  1356. {
  1357. $this->body = $body;
  1358. parent::__construct($properties, $prop_types=AMQPMessage::$PROPERTIES);
  1359. }
  1360. }
  1361. ?>