amqp_consumer.php 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. #!/usr/bin/php
  2. <?php
  3. /**
  4. * Repeatedly receive messages from queue until it receives a message with
  5. * 'quit' as the body.
  6. *
  7. * @author Sean Murphy<sean@iamseanmurphy.com>
  8. */
  9. require_once('../amqp.inc');
  10. $HOST = 'localhost';
  11. $PORT = 5672;
  12. $USER = 'guest';
  13. $PASS = 'guest';
  14. $VHOST = '/';
  15. $EXCHANGE = 'airtime-schedule';
  16. $QUEUE = 'msgs';
  17. $CONSUMER_TAG = 'consumer';
  18. $conn = new AMQPConnection($HOST, $PORT, $USER, $PASS);
  19. $ch = $conn->channel();
  20. $ch->access_request($VHOST, false, false, true, true);
  21. $ch->queue_declare($QUEUE);
  22. $ch->exchange_declare($EXCHANGE, 'direct', false, true);
  23. $ch->queue_bind($QUEUE, $EXCHANGE);
  24. function process_message($msg) {
  25. global $ch, $CONSUMER_TAG;
  26. echo "\n--------\n";
  27. echo $msg->body;
  28. echo "\n--------\n";
  29. $ch->basic_ack($msg->delivery_info['delivery_tag']);
  30. // Cancel callback
  31. if ($msg->body === 'quit') {
  32. $ch->basic_cancel($CONSUMER_TAG);
  33. }
  34. }
  35. $ch->basic_consume($QUEUE, $CONSUMER_TAG, false, false, false, false, 'process_message');
  36. // Loop as long as the channel has callbacks registered
  37. echo "Waiting for messages...\n";
  38. while(count($ch->callbacks)) {
  39. $ch->wait();
  40. }
  41. $ch->close();
  42. $conn->close();
  43. ?>