amqp_receive.php 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. <?php
  2. //AMQP PHP library test
  3. require_once('amqp.inc');
  4. $EXCHANGE = 'test';
  5. $BROKER_HOST = 'localhost';
  6. $BROKER_PORT = 5672;
  7. $QUEUE = 'myqueue';
  8. $USER ='guest';
  9. $PASSWORD ='guest';
  10. $msg_body = NULL;
  11. $myCallback = function($msg) {
  12. var_dump($msg);
  13. };
  14. try
  15. {
  16. echo "Creating connection\n";
  17. $conn = new AMQPConnection($BROKER_HOST, $BROKER_PORT,
  18. $USER,
  19. $PASSWORD);
  20. echo "Getting channel\n";
  21. $ch = $conn->channel();
  22. echo "Requesting access\n";
  23. $ch->access_request('/data', false, false, true, true);
  24. echo "Declaring exchange\n";
  25. $ch->exchange_declare($EXCHANGE, 'direct', false, false, false);
  26. echo "Declaring queue\n";
  27. $ch->queue_declare($QUEUE, false, true, false, false);
  28. echo "Binding queue to exchange\n";
  29. $ch->queue_bind($QUEUE, $EXCHANGE);
  30. echo "Receiving message\n";
  31. $ch->basic_consume($QUEUE, $consumer_tag, false, false, false, false, $myCallback);
  32. //$ch->basic_consume($EXCHANGE, "tag", false, false, false, false, 'myCallback');
  33. echo "Waiting\n";
  34. while (count($ch->callbacks)) {
  35. $ch->wait();
  36. }
  37. echo "Closing channel\n";
  38. $ch->close();
  39. echo "Closing connection\n";
  40. $conn->close();
  41. echo "Done.\n";
  42. } catch (Exception $e) {
  43. echo 'Caught exception: ', $e->getMessage();
  44. echo "\nTrace:\n" . $e->getTraceAsString();
  45. }
  46. ?>