我创建了一个简单的发布者和消费者,使用basic.consume在队列上预订.
我的消费者承认在作业运行时没有异常的消息.每当我遇到一个例外情况,我都不会回信,早点回来.只有确认的消息从队列中消失,这样才能正常工作.
现在我希望消费者再次收到失败的消息,但重新发送这些消息的唯一方法是重新启动消费者.
我该如何处理这个用例?
安装代码
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('my-exchange');
$exchange->setType('fanout');
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declare();
$queue->bind('my-exchange');
消费者代码
$queue->consume(array($this,'callback'));
public function callback(AMQPEnvelope $msg)
{
try {
//Do some business logic
} catch (Exception $ex) {
//Log exception
return;
}
return $queue->ack($msg->getDeliveryTag());
}
生产者代码
$exchange->publish('message');
如果消息未被确认并且应用程序发生故障,则将自动重新发送,并且将信封上的重新传递属性设置为true(除非使用no-ack = true标志消费).
UPD:
您必须在catch块中使用重新发送标志来消息
try {
//Do some business logic
} catch (Exception $ex) {
//Log exception
return $queue->nack($msg->getDeliveryTag(),AMQP_REQUEUE);
}
当RabbitMQ和AMQP协议中没有实现重新传递计数时,请注意无限制的Nacked消息.
如果你不想混淆这样的消息,只是想添加一些延迟,你可能想在nack方法调用之前添加一些sleep()或usleep(),但这根本不是一个好主意.
有多种技术来处理循环重传问题:
依靠Dead Letter Exchanges
>职业:可靠,标准,清晰
cons:需要额外的逻辑
2.使用per message or per queue TTL
>职业:容易实施,也是标准,明确
cons:长排队你可能会丢掉一些信息
示例(注意,对于队列ttl,我们只传递数字和消息ttl – 任何将是数字字符串):
2.1每消息ttl:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish(
'message at ' . microtime(true),null,AMQP_noparaM,array(
'expiration' => '1000'
)
);
2.2.每队列ttl:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->setArgument('x-message-ttl',1000);
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish('message at ' . microtime(true));
3.在消息正文或标题中保留重新传送者计数或者重新设置重定向器号码(也称为IP堆栈中的跳数限制或ttl)
>专业人士:在应用程序级别上给予您对消息生命周期的额外控制
> cons:显着的开销,而你必须修改消息并再次发布,具体应用,不清楚
码:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish(
'message at ' . microtime(true),array(
'headers' => array(
'ttl' => 100
)
)
);
$queue->consume(
function (AMQPEnvelope $msg,AMQPQueue $queue) use ($exchange) {
$headers = $msg->getHeaders();
echo $msg->isRedelivery() ? 'redelivered' : 'origin',' ';
echo $msg->getDeliveryTag(),' ';
echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl',' ';
echo $msg->getBody(),PHP_EOL;
try {
//Do some business logic
throw new Exception('business logic Failed');
} catch (Exception $ex) {
//Log exception
if (isset($headers['ttl'])) {
// with ttl logic
if ($headers['ttl'] > 0) {
$headers['ttl']--;
$exchange->publish($msg->getBody(),$msg->getRoutingKey(),array('headers' => $headers));
}
return $queue->ack($msg->getDeliveryTag());
} else {
// without ttl logic
return $queue->nack($msg->getDeliveryTag(),AMQP_REQUEUE); // or drop it without requeue
}
}
return $queue->ack($msg->getDeliveryTag());
}
);
可能还有其他一些方法可以更好地控制消息重传流.
结论:没有银弹解决方案.你必须决定什么解决方案适合你的需要最好或找出其他的东西,但不要忘记在这里分享)