spring-cloud-stream-binder-sqs

Spring Cloud Stream Binder for AWS SQS

spring-cloud-stream-binder-sqs lets you use Spring Cloud Stream with
the AWS Simple Queue Service (SQS).

Installation

<dependencies>
    <dependency>
        <groupId>de.idealo.spring</groupId>
        <artifactId>spring-cloud-stream-binder-sqs</artifactId>
        <version>3.0.0</version>
    </dependency>
</dependencies>

Compatabilty

spring-cloud-stream-binder-sqs spring-boot spring-cloud-aws spring-cloud aws sdk java compiler/runtime
1.9.0 2.7.x 2.4.x 2021.0.5 1.x 8
3.0.0 3.1.x 3.0.x 2022.0.3 2.x 17

Changes in 3.0.0:

  • removed consumer configuration for messageDeletionPolicy: the default behaviour is now that Messages will be
    acknowledged when message processing is successful.
  • renamed consumer configuration for maxNumberOfMessages to maxMessagesPerPoll to align with the naming in
    spring-cloud-aws-sqs. The old property is deprecated but still supported for now.
  • renamed consumer configuration for waitTimeout to pollTimeout to align with the naming in
    spring-cloud-aws-sqs. The old property is deprecated but still supported for now.
  • renamed consumer configuration for queueStopTimeout to listenerShutdownTimeout to align with the naming in
    spring-cloud-aws-sqs. The old property is deprecated but still supported for now.

Usage

With the library in your dependencies you can configure your Spring Cloud Stream bindings as usual. The type name for
this binder is sqs. The destination needs to match the queue name, the specific ARN will be looked up from the
available queue in the account.

You may also provide additional configuration options:

  • Consumers
    • maxMessagesPerPoll – Maximum number of messages to retrieve with one poll to SQS. Must be a number between 1
      and 10.
    • visibilityTimeout – The duration in seconds that polled messages are hidden from subsequent poll requests
      after having been retrieved.
    • pollTimeout – The duration in seconds that the system will wait for new messages to arrive when polling. Uses
      the Amazon SQS long polling feature. The value should be between 1 and 20.
    • listenerShutdownTimeout – The number of milliseconds that the queue worker is given to gracefully finish its
      work on
      shutdown before interrupting the current thread. Default value is 10 seconds.
    • snsFanout – Whether the incoming message has the SNS format and should be deserialized automatically. Defaults
      to true.

Example Configuration:

spring:
  cloud:
    stream:
      sqs:
        bindings:
          someFunction-in-0:
            consumer:
              snsFanout: false
      bindings:
        someFunction-in-0:
          destination: input-queue-name
        someFunction-out-0:
          destination: output-queue-name

You may also provide your own beans of SqsAsyncClient to override those that are created
by spring-cloud-aws-autoconfigure.

FIFO queues

To use FIFO SQS queues
you will need to provide a deduplication id and a group id.
With this binder you may set these using the message headers SqsHeaders.GROUP_ID and SqsHeaders.DEDUPLICATION_ID.
The example below shows how you could use a FIFO queue in real life.

Example Configuration:

spring:
  cloud:
    stream:
      bindings:
        someFunction-in-0:
          destination: input-queue-name
        someFunction-out-0:
          destination: output-queue-name.fifo

class Application {
    @Bean
    public Message<String> someFunction(String input) {
        return MessageBuilder.withPayload(input)
                .setHeader(SqsHeaders.GROUP_ID, "my-application")
                .setHeader(SqsHeaders.DEDUPLICATION_ID, UUID.randomUUID())
                .build();
    }
}

Concurrency

Consumers in the SQS binder support the Spring Cloud Stream concurrency property.
By specifying a value you will launch concurrency threads continuously polling for maxNumberOfMessages each.
The threads will process all messages asynchronously, but each thread will wait for its current batch of messages to all
complete processing before retrieving new ones.
If your message processing is highly variable from message to message it is recommended to set a lower value
for maxNumberOfMessages and a higher value for concurrency.
Note that this will increase the amount of API calls done against SQS.

Example Configuration:

spring:
  cloud:
    stream:
      sqs:
        bindings:
          someFunction-in-0:
            consumer:
              maxMessagesPerPoll: 5
      bindings:
        someFunction-in-0:
          destination: input-queue-name
          consumer:
            concurrency: 10

Visit original content creator repository
https://github.com/idealo/spring-cloud-stream-binder-sqs

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *