AWS SNS & SQS Event Bus

Wednesday, May 29, 2019

Even when you have a monolithic architecture like we do at https://dropconfig.com you want to do things asynchronously and be able to respond to events.

For example if a user is created we may want to:

  • Update billing information
  • Send the User an email
  • Send the person who invited them an email

Doing this all in one function is fine but we can do better with an event bus.

SNS

AWS SNS is a service letting you pub sub messages. You can pass messages to HTTP endpoints or AWS lambdas or even SQS. It makes it easy to send a message to one place and have it passed a long to anyone who wants to listen.

SQS

AWS SQS is a dead simple queue. SNS can write messages to it with a click of a button in the AWS console. Then we can read from that queue in whatever fashion we want.

Events

First off, we want to start sending events to SNS so it can forward that message to all of our Queues.

I will be using nodejs for all the code examples

// We are assuming we already created a topic in the AWS console.
const sns = new aws.SNS({region: "us-east-1"});

const params = {
    Message: JSON.stringify({
        type: "some-event-type",
        data: {
            "some": "data"
        }
    }),
    TopicArn: SnsTopic //Create a topic in the SNS console and get the ARN for this.
}
sns.publish(params).promise() //We can asyn await or just do promise stuff here.

As you can see sending a message can be pretty easy. We might want to set some MessageAttributes but we will cover that later.

Event Handlers

We are setting this up so that SNS publishes these to SQS. We also want to have a queue per task type that we want to run.

E.g. Billing tasks would have a queue separate from Email tasks.

You can set up AWS lambda to read from your queues and AWS will scale them up as needed.

But as mentioned before we want to keep most of our code in our node app so how do we read from SQS and react to events?

We first want something to manage a queue and run workers

//We have a max number of workers that can run at once
//We don't want to melt our server.
const MAX_WORKERS = 10;
let currentWorkers = 0;
async function createTaskRunner(sqs, queue, server) {
  let running = false;

  // we have a copy of this running async for each task-type
  async function start() {
    running = true;
    while (running) {
      // This is soft target, because we may reenter here
      // while async waiting on the tasks. In other words:
      // - workers == 10
      // - we are good, we poll the task
      // - polling takes longer than a second
      // - and workers == MAX_WORKERS
      // - we poll again
      //
      // So this will overshoot a little.
      if (current_workers <= MAX_WORKERS) {
        const task = await checkQueue(sqs, queue);
        if (task) {

          // this is run async here to allow
          // running multiple workers of the same type
          // in parallel
          runTask(task, queue, sqs, server);
        }
      }
      await wait(1000);
    }
  }

  return {
    start,
    stop: () => {
      running = false
    }
  }
}

This function will manage the queue and bring up workers to handle events that show up in a queue next we want to define checkQueue to check if there is even a task to run.

async function checkQueue(sqs, queue) {
  const params = {
    QueueUrl: queue.url,
    MaxNumberOfMessages: 1,
    //WaitTimeSeconds is important. 
    //The `
await
` will wait until it gets something from the queue or 20 seconds has passed before returning. 
    //This way we don't keep running over and over and over as fast as possible.
    WaitTimeSeconds: 20,
  }
  const res = await sqs.receiveMessage(params).promise();
  if (res.Messages && res.Messages.length) {
    const message = res.Messages[0];
    let messageBody;
    try {
      const data = JSON.parse(message.Body);
      messageBody = JSON.parse(data.Message);
    }
    catch (e) {
      messageBody = message.Body
    }

    const task = {
      id: message.MessageId,
      receipt: message.ReceiptHandle,
      queue: queue.url,
      data: messageBody,
      message: message
    }
    return task;
  } else {
    return null;
  }


}

Now let’s check out runTask we put this in a separate function from createTaskRunner so we can have multiple workers on a queue at a time.

async function runTask(task, queue, sqs, server) {
  workers = workers + 1
  const taskSummary = {
    type: queue.type,
    id: task.id
  }
  
  try {
    const complete = await queue.handler(task, queue, sqs, server)
    if (complete) {
      
      await sqs.deleteMessage({
        QueueUrl: queue.url,
        ReceiptHandle: task.receipt
      }).promise();
    }
    else {
      //We even create events in our event handlers.
      server.createEvent(TASK_WORKER_FAILED, {
          taskSummary,
          complete
      });
    }
  } catch (e) {
    server.createEvent(TASK_WORKER_FAILED, {
        taskSummary,
        e
    });
  }
  workers = workers - 1
  
}

A task worker is defined as a stateless function that gets data from the runTask function does its thing and returns if it succeeded on that event or not.

Let’s look at an example for sending an email.

exports.handler = (task, queue, sqs, server) => {
   let to;
   let message;

   // We can handle multiple types of events here.
   if(task.data.type === "USER_CREATED"){
        to = task.data.data.user.email;
        message = "WELCOME TO THE SITE!"
   }
   if(task.data.type === "USER_COMMENT"){
        to = task.data.data.post.creator.email;
        message = "SOMEONE COMMENTED ON YOUR POST";
   }

   if(to && message){
        //This is obviously simplified. 
        //Sending emails is not part of this article.
        sendEmail(to, message);
        
   }

   //If we get a event type we don't care about we just ignore it.
   return true;

}

So now we just have one last piece to put together. Initializing task queues.

const taslQueues = [
    {name: "email", "url": "https://url-to-email-queue.aws.amazon.com/", handler: require("./emailHandler")}
    {name: "billing", "url": "https://url-to-billing-queue.aws.amazon.com", handler: require("./billingHandler")}
]
async function init(sqs, server, tasks) {
  const runners = [];
  taskQueues.forEach(async (queue) => {
    const runner = await createTaskRunner(sqs, queue, server);
    runners.push(runner);
    runner.start();
  })
}

Now we have 2 task runners listening for messages in their respective queues.

Limiting messages to queues

Say you have a lot of messages getting passed around but your email queue only cares about a few types and ignores the rest. Well luckily SNS has our backs here with the ability to filter messages coming to our queue https://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html

We just have to use the MessageAttributes I mentioned earlier to accomplish this.

const sns = new aws.SNS({region: "us-east-1"});

const params = {
    Message: JSON.stringify({
        type: "some-event-type",
        data: {
            "some": "data"
        }
    }),

    // We add in a message attribute to filter on.
    MessageAttributes: {
        "event-type": {
            DataType: "String",
            StringValue: "some-event-type"
         }
    },
    TopicArn: SnsTopic //Create a topic in the SNS console and get the ARN for this.
}
sns.publish(params).promise() //We can asyn await or just do promise stuff here.

And the subscription filter policy for the subscription. It will only receive events with the listed event types.

{
  "event-type": [
    "some-event-type"
  ]
}

So that’s a more high level overview of how you can be passing messages around.

Other Benefits

Datalake

Now that we have an event bus we can store every event we receive into some document store (we just log to cloudwatch actually). Allowing you to query any event that happened and even replay them!

Activity feeds

Say we have some activity feed that shows when a post was commented on.

We could come up with queries for the database but that might start to get complex. Or if we store some of our events in a document store like mongodb we can just ask for events of type USER_COMMENT and give them back in order!

Incoming webhooks

At DropConfig we use stripe. Rather than have to handle the incoming webhooks stripe sends to us at the webhook http handler, we can create events from them. This puts them in the bus and perhaps our billing handler can take it from there.

It’s really simple

server.route({
    path: "/api/webhooks/incoming/stripe",
    method: "POST",
    options: {
        handler: (req) => {
            server.createEvent("STRIPE_INCOMING_WEBHOOK", req.payload);
            return true
        }
    }
})

Now anyone who cares can listen for STRIPE_INCOMING_WEBHOOK events and react accordingly.

If you made it this far thanks for reading! Please check out https://dropconfig.com I think you’ll like it.

Feel free to comment with any questions!

This post is also available on DEV.