How we saved over million dollars by batching SQS messages?
Published:
This blog explains how we batched SQS messages to reduce our costs.
Introduction
For the first stage, we would start with SendMessageBatch function. This allows us to send 10 messages in a single request, Since we are charged per request and our message size is extremely low , we can comfortably send 10 messages together at the cost of 1.
The input for SendMessageBatch is the queue url and an array of SendMessageBatchRequestEntry( It contains the details of a single Amazon SQS message along with an ID, which is exactly the same details we have for SendMessage function)
Now, Since we have multiple goroutines(User level thread) who can push messages to SQS, Maintaining an array of messages with maximum size 10 is not trivial. Moreover we want to have a mechanism where we send message at least once within a time interval(Let’s call this time t)
So, To summarize Send Message Batch when we have received 10 messages or t time has elapsed, whichever happens first.
Architecture
SendMessage
We would be making use of golang channels.
3 blocks in this architecture. Each block is a separate goroutine or a set of goroutines
S - SendMessage Block
Every goroutine that wants to send a message would call the SendMessageBlock, it would add the message to the channel(calling this channel messageChannel)
B - Batching Block
This goroutine would keep on creating the array of messages we are going to send through SendMessageBatch
It would read messages from the messageChannel and also maintain a ticker(which would repeatedly send message to tickerChannel after every time interval t) It would maintain an array of messages (calling it batch)and keep pushing messages it receives from the messageChannel. When the number of messages read from messageChannel reaches 10 or if we recieve a signal from tickerChannel, push batch to batchChannel
** Pseduo Code **
func batcher(messageChannel chan genericEntry, t time.Duration) {
var ticker *time.Ticker //Maintain a ticker
ticker = time.NewTicker(t)
defer ticker.Stop()
batchChannel := make(chan []message)
go c.dispatcher(jobs, op, &dispatchers)
for {
var batch [10]message
for {
select {
case msg := <-messageChannel:
batch = append(batch, entry)
if len(batch) < 10 {
// batch hasn't filled up yet, continue to wait
continue
}
case <-ticker.C:
//ticker sends a signal that t time has passed
if len(batch) < 1 {
// time's up but nothing to dispatch, continue to wait
continue
}
}
batchChannel <- batch
break // break inner loop, create a new batch
}
}
}
D - Dispatcher Block
This goroutine would be pushing the messages to SQS
It would keep on recieving from batchChannel. The moment it recieves a batch, it would simply push to SQS using SendMessageBatch function. We can keep track of number of messages sent and number of times SendMessageBatch action is called inside this function.
ReceiveMessage
There won’t be a lot of changes in recieveMessage. We currently receive upto 10 messages together, This won’t change.
DeleteMessage
Exactly the same architecture as SendMessage. We can reduce DeleteMessage requests by upto 10 times as well using the DeleteMessageBatch request.
Impact
Reduction in number of SendMessage and DeleteMessage requests by upto 10 times.