Retry with SNS and Symfony Messenger

10 minute read

In a previous post I described how we are using Amazon SNS with Symfony Messenger. But what I didn’t mention is how we handle failures. Sure, we write perfect code with tonnes of testing but failures will still happen. To ensure that each message is successfully handled we need to implement a retry mechanism for failed messages.

The general idea is to put failed messages on a queue and then republish all the messages from that queue later. Hopefully we will detect and fix the issue before next time the message is published on SNS.

To achieve this we first need to modify our SnsConsumer to catch all exceptions:

// src/Consumer/SnsConsumer.php
declare(strict_types=1);

namespace App\Consumer\SnsConsumer;

use Psr\Log\LoggerInterface;
use Rawls\QueueAdapter\Channel;
use Rawls\QueueAdapter\Message;
use Symfony\Component\Messenger\MessageBusInterface;

final class SnsConsumer
{
    private $bus;
    private $serializer;
    private $amqpChannel;
    private $logger;

    public function __construct(
        MessageBusInterface $bus,
        SerializerInterface $serializer,
        Channel $amqpChannel,
        LoggerInterface $logger
    ) {
        $this->bus = $bus;
        $this->serializer = $serializer;
        $this->amqpChannel = $amqpChannel;
        $this->logger = $logger;
    }

    public function consume(array $event)
    {
        try {
            $envelope = $this->serializer->decode(['body' => $event['Message']]);
            $this->bus->dispatch($envelope);
        } catch (\Throwable $e) {
            $this->logger->critical('Could not consume message from SNS.', [
                'exception' => $e,
                'category' => 'sns',
            ]);

            $mqMessage = new Message($event['Message'], [
                'delivery_mode' => 2, // Persistent
                'content_type' => 'application/json',
                'headers' => [
                    'SnsTopicArn' => $event['TopicArn'],
                    'x-delay' => 15000, // Wait 15 seconds before processing
                ],
            ]);
            $this->amqpChannel->publish($mqMessage, 'sns_retry');
        }
    }
}

The Rawls\QueueAdapter is just Happyr’s abstraction layer over AMQP. It makes the code look a little cleaner, that is all.

We decorate the message with some metadata to the message like what SnsTopicArn we should republish the message at. Since we have the Rabbitmq delayed message plugins we also add the 'x-delay header.

We modified our bin/message-consumer slightly to allow the SnsConsumer to see more of the SNS event:

#!/usr/bin/env php
<?php

use App\Kernel;
use App\Consumer\SnsConsumer;

require_once \dirname(__DIR__).'/vendor/autoload.php';
require_once \dirname(__DIR__).'/config/bootstrap.php';

lambda(static function (array $event) {
    $kernel = new Kernel($_SERVER['APP_ENV'], (bool) $_SERVER['APP_DEBUG']);
    $kernel->boot();
    $container = $kernel->getContainer();

    /** @var SnsConsumer $consumer */
    $consumer = $container->get(SnsConsumer::class);

    foreach ($event['Records'] as $record) {
        if (!isset($record['Sns'])) {
            continue;
        }

        // Pass $record['Sns'] instead of $record['Sns']['Message']
        $consumer->consume($record['Sns']);
    }

    return 'OK.';
});

Great, so the failed messages ends up on an amqp exchange called sns_retry and are sent to a queue. Lets create a new application that reads from that queue and republish messages back on SNS. We will of course use Bref and Lambda for this application and then configure it to run periodically. This is my src/publish.php:

// src/publish.php
declare(strict_types=1);

/*
 * Expect the following environment variables:
 * AWS_TARGET_REGION=eu-central-1
 * AWS_ACCESS_KEY_ID=foo
 * AWS_SECRET_ACCESS_KEY=bar
 * HAPPYR_MQ=amqp://.../lambda_retry
 */

use Rawls\QueueAdapter\ChannelFactory;

include \dirname(__DIR__).'/vendor/autoload.php';

$channel = ChannelFactory::create(\getenv('HAPPYR_MQ'));

$publisher = new \Aws\Sns\SnsClient([
    'version' => '2010-03-31',
    'region' => \getenv('AWS_TARGET_REGION'),
]);

lambda(function (array $event) use ($channel, $publisher) {
    echo 'Reading from queue: '.$event['queue'];
    $count = 0;
    while ($message = $channel->get(false, $event['queue'])) {
        $attr = $message->getAttributes();
        if (!isset($attr['headers']['SnsTopicArn'])) {
            echo "No SnsTopicArn found \n";

            continue;
        }

        try {
            $publisher->publish([
                'TopicArn' => $attr['headers']['SnsTopicArn'],
                'Message' => $message->getMessage(),
            ]);
        } catch (\Throwable $throwable) {
            echo "Could not publish message \n";
            echo \json_encode($attr)."\n";
            echo $message->getMessage()."\n";
            echo $throwable->getMessage()."\n";

            // Start handling other messages.
            continue;
        }

        // If we cannot publish the message, we will never acknowledge it.
        $message->ack();
        ++$count;
    }

    if ($count > 0) {
        echo \sprintf('Republished %d messages.', $count);
    }
});

Quite simple. Just read a message and publish. We could be done here, but these little scripts are missing two features:

  • Multiple SNS subscribers
  • Retry back-off

Multiple SNS subscribers

If you have multiple applications subscribed to the same SNS topic and one of them fails. You dont want all applications to retry the message. Only the failed one.

We can achieve this with adding an application_id to the message. If the application id is empty or matching this application, then we should process the message.

 // src/Consumer/SnsConsumer.php

 final class SnsConsumer
 {
     private $applicationId;
     
     // ...
     
     public function consume(array $event)
     {
         $messageAppId = $event['MessageAttributes']['application_id']['Value'] ?? null;
         if (!empty($messageAppId) && $messageAppId !== $this->applicationId) {
             // This message is not for us to retry
             return;
         }
         
         // ...
         
         // Send application_id to the queue. 
         $mqMessage = new Message($event['Message'], [
             'delivery_mode' => 2, // Persistent
             'content_type' => 'application/json',
             'headers' => [
                 'SnsTopicArn' => $event['TopicArn'],
                 'application_id' => $this->applicationId,
             ],
         ]);
         $this->amqpChannel->publish($mqMessage, 'sns_retry');
     }
 }
 
 

The publish.php needs to forward the application_id to the SNS message using “MessageAttributes”:

// src/publish.php

// ...

lambda(function (array $event) use ($channel, $publisher) {
    // ...
    
    $publisher->publish([
        'TopicArn' => $attr['headers']['SnsTopicArn'],
        'Message' => $message->getMessage(),
        'MessageAttributes' => [
            'application_id' => ['DataType' => 'String', 'StringValue' => ''.$attr['headers']['application_id'] ?? ''],
        ],
    ]);
    
    // ...
});

Retry back-off

If a message just failed, it might have been because some small glitch in the network. So we want to retry processing the message pretty quickly. However, if a message has failed a few times, we maybe want to retry it every third hour or something similar. Hopfully some developer has deployed a fix so the message will not fail again.

To implement this back-off we use an AMQP topic exchange and different routing keys. We decided that we wanted to use 3 queues:

  • sns_retry_0 (runs every minute)
  • sns_retry_1 (runs every 10 minutes)
  • sns_retry_2 (runs second hour)

The SnsConsumer needs to keep track of how many time we retried a message and choose different topics accordligly.

// src/Consumer/SnsConsumer.php

final class SnsConsumer
{    
    // ..

    public function consume(array $event)
    {
        // ...
        $retryAttempt = (int) ($event['MessageAttributes']['retry_attempt']['Value'] ?? 0);
        $mqMessage = new Message($event['Message'], [
            'delivery_mode' => 2, // Persistent
            'content_type' => 'application/json',
            'headers' => [
                'SnsTopicArn' => $event['TopicArn'],
                'retry_attempt' => $retryAttempt + 1,
                'x-delay' => 0 === $retryAttempt ? 0 : 15000, // Wait 15 seconds before processing
            ],
        ]);
        $this->channel->publish($mqMessage, 'sns_retry', $this->getRoutingKeyFromRetryAttempt($retryAttempt));
    }
    
    private function getRoutingKeyFromRetryAttempt(int $retries): string
    {
        switch ($retries) {
            case 0:
            case 1:
                return 'retry_0';
            case 2:
            case 3:
                return 'retry_1';
            default:
                return 'retry_2';
        }
    }
}

Now we need to configure the publish.php´ to forward the retry_attempt` header:

// src/publish.php

// ...

lambda(function (array $event) use ($channel, $publisher) {
    // ...
    
    $publisher->publish([
        'TopicArn' => $attr['headers']['SnsTopicArn'],
        'Message' => $message->getMessage(),
        'MessageAttributes' => [
              'retry_attempt' => ['DataType' => 'String', 'StringValue' => ''.$attr['headers']['retry_attempt'] ?? 1],
        ],
    ]);
    
    // ...
});

Everything works fine. When deploying this we need to make sure we use 3 different Lambda functions to read from the different queues.

The complete code examples

Bellow are SnsConsumer and publish.php with all features included.

// src/Consumer/SnsConsumer.php
declare(strict_types=1);

namespace Rawls\SnsConsumer;

use Psr\Log\LoggerInterface;
use Rawls\QueueAdapter\Channel;
use Rawls\QueueAdapter\Message;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

final class SnsConsumer
{
    private $applicationId;
    private $bus;
    private $serializer;
    private $channel;
    private $logger;

    public function __construct(
        string $applicationId,
        MessageBusInterface $bus,
        SerializerInterface $serializer,
        Channel $channel,
        LoggerInterface $logger
    ) {
        if (empty($applicationId)) {
            throw new \LogicException('Application id cannot be empty');
        }
        $this->applicationId = $applicationId;
        $this->bus = $bus;
        $this->serializer = $serializer;
        $this->channel = $channel;
        $this->logger = $logger;
    }

    public function consume(array $event)
    {
        $messageAppId = $event['MessageAttributes']['application_id']['Value'] ?? null;
        if (!empty($messageAppId) && $messageAppId !== $this->applicationId) {
            // This message is not for us to retry
            return;
        }

        try {
            $envelope = $this->serializer->decode(['body' => $event['Message']]);
            $this->bus->dispatch($envelope);
        } catch (\Throwable $e) {
            $retryAttempt = (int) ($event['MessageAttributes']['retry_attempt']['Value'] ?? 0);
            $this->logger->critical('Could not consume message from SNS.', [
                'exception' => $e,
                'category' => 'sns',
                'retry_attempt' => $retryAttempt,
            ]);

            $mqMessage = new Message($event['Message'], [
                'delivery_mode' => 2, // Persistent
                'content_type' => 'application/json',
                'headers' => [
                    'SnsTopicArn' => $event['TopicArn'],
                    'retry_attempt' => $retryAttempt + 1,
                    'application_id' => $this->applicationId,
                    'x-delay' => 0 === $retryAttempt ? 0 : 15000, // Wait 15 seconds before processing
                ],
            ]);
            $this->channel->publish($mqMessage, 'sns_retry', $this->getRoutingKeyFromRetryAttempt($retryAttempt));
        }
    }

    private function getRoutingKeyFromRetryAttempt(int $retries): string
    {
        switch ($retries) {
            case 0:
            case 1:
                return 'retry_0';
            case 2:
            case 3:
                return 'retry_1';
            default:
                return 'retry_2';
        }
    }
}
// src/publish.php

declare(strict_types=1);

/*
 * Expect the following environment variables:
 * AWS_TARGET_REGION=eu-central-1
 * AWS_ACCESS_KEY_ID=foo
 * AWS_SECRET_ACCESS_KEY=bar
 * HAPPYR_MQ=amqp://.../lambda_retry
 */

use Rawls\QueueAdapter\ChannelFactory;
use Rawls\QueueAdapter\Impl\AmqpExtChannel;

include \dirname(__DIR__).'/vendor/autoload.php';

$channel = ChannelFactory::create(\getenv('HAPPYR_MQ'));
if (!$channel instanceof AmqpExtChannel) {
    echo 'We need an instance of '.AmqpExtChannel::class;

    return;
}

$publisher = new \Aws\Sns\SnsClient([
    'version' => '2010-03-31',
    'region' => \getenv('AWS_TARGET_REGION'),
]);

lambda(function (array $event) use ($channel, $publisher) {
    if (!isset($event['queue'])) {
        throw new RuntimeException('Queue must be defined');
    }

    echo 'Reading from queue: '.$event['queue'];
    $count = 0;
    while ($message = $channel->get(false, $event['queue'])) {
        $attr = $message->getAttributes();
        if (!isset($attr['headers']['SnsTopicArn'])) {
            echo "No SnsTopicArn found \n";

            continue;
        }

        try {
            $publisher->publish([
                'TopicArn' => $attr['headers']['SnsTopicArn'],
                'Message' => $message->getMessage(),
                'MessageAttributes' => [
                    'application_id' => ['DataType' => 'String', 'StringValue' => ''.$attr['headers']['application_id'] ?? ''],
                    'retry_attempt' => ['DataType' => 'String', 'StringValue' => ''.$attr['headers']['retry_attempt'] ?? 1],
                ],
            ]);
        } catch (\Throwable $throwable) {
            echo "Could not publish message \n";
            echo \json_encode($attr)."\n";
            echo $message->getMessage()."\n";
            echo $throwable->getMessage()."\n";

            // Start handling other messages.
            continue;
        }

        // If we cannot publish the message, we will never ack it.
        $message->ack();
        ++$count;
    }

    if ($count > 0) {
        echo \sprintf('Republished %d messages.', $count);
    }
});

My Bref config looks as follows:

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Globals:
    Function:
        Environment:
            Variables:
                AWS_TARGET_REGION: eu-central-1
                HAPPYR_MQ: ''

Resources:
    Consumer:
        Type: AWS::Serverless::Function
        Properties:
            FunctionName: 'sns-republish'
            Role: 'arn:aws:iam::xxxxxxxxx:role/lambda_sns-republisher' # Role that allows to publish messages
            CodeUri: .
            Handler: src/publish.php
            Timeout: 10 # in seconds
            MemorySize: 1024
            Runtime: provided
            Layers:
                - 'arn:aws:lambda:eu-central-1:xxxxxxxxx:layer:bref-php-with-amqp-73:1'
            VpcConfig:
                SecurityGroupIds:
                    - sg-xxxxxxxxx
                SubnetIds:
                    - subnet-xxxxxxxxx

            Events:
                Retry0:
                    Type: Schedule
                    Properties:
                        Schedule: cron(0/1 * * * ? *)
                        Input: '{"queue": "sns_retry_0"}'
                Retry1:
                    Type: Schedule
                    Properties:
                        Schedule: cron(0/10 * * * ? *)
                        Input: '{"queue": "sns_retry_1"}'
                Retry2:
                    Type: Schedule
                    Properties:
                        Schedule: cron(0 0/2 * * ? *)
                        Input: '{"queue": "sns_retry_2"}'

I hope this post was helpful. Please give me your thoughts in the comments below.

Leave a Comment