Spring Boot SSE With SNS, SQS and S3

Overview

A classic pattern in web applications is that the user requests something that is not immediately available, your app then does a heavy computation and returns the result to the user.

However, since the task is long-running, you cannot make the user wait for the result. One of the easiest ways is to email the result to the user. However, this requires the user to do extra work. With SSE, we can notify the user right on the web application.

In this post, I’ll show you how to do that with AWS technologies. The scenario is

  1. The user requests a report
  2. The backend does computation (async) (or calls other services for computation, e.g. Lambda) then puts the result on s3
  3. The backend then triggers SNS
  4. SNS then triggers SQS
  5. The backend, which is already subscribed to SQS, gets the payload and sendsit to the user via SSE

Let’s implement this. I will omit the user requests part since it’s quite trivial to implement that feature. This post will show you how to handle messages from SNS to SQS and to your app. We will simulate the report available event by publishing a message to SNS. In your actual app, this is the task of your backend or the Lambda.

Flow

workflow

Implementation

Let’s first setup the necessary AWS resources (SNS, SQS)

Create SNS topic & SQS Queue

aws sqs create-queue --queue-name s3-event-download-queue --profile aws-demo

aws sns create-topic --name s3-download-sns-topic --profile aws-demo

After creating the the queue, you will get the queue URL, use the following command to get the queue ARN

aws sqs get-queue-attributes --queue-url YOUR_QUEUE_URL \  
--attribute-name QueueArn --query Attributes.QueueArn --output text --profile aws-demo

Subscribe SQS to SNS

aws sns subscribe \                                                      
  --topic-arn SNS_TOPIC_ARN \
  --protocol sqs \
  --notification-endpoint SQS_QUEUE_ARN \
  --profile aws-demo

Set policy for SNS to publish to SQS

Create a JSON file like this and name it policy.json

{
  "Policy": "{\n  \"Version\": \"2012-10-17\",\n  \"Statement\": [\n    {\n      \"Sid\": \"AllowSNSPublish\",\n      \"Effect\": \"Allow\",\n      \"Principal\": {\"Service\": \"sns.amazonaws.com\"},\n      \"Action\": \"sqs:SendMessage\",\n      \"Resource\": \"SQS_ARN\",\n      \"Condition\": {\n        \"ArnEquals\": {\n          \"aws:SourceArn\": \"SNS_ARN\"\n        }\n      }\n    }\n  ]\n}"
}

Then run the following to allow SNS to publish to SQS

aws sqs set-queue-attributes \                                           ─╯
  --queue-url "SQS_QUEUE_URL" \
  --attributes file://policy.json \
  --profile aws-demo

Now you’re all set. Let’s create the web app.

The Application

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.4.4</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.datmt.learning.java</groupId>
	<artifactId>web</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>web</name>
	<description>web</description>
	<url/>
	<licenses>
		<license/>
	</licenses>
	<developers>
		<developer/>
	</developers>
	<scm>
		<connection/>
		<developerConnection/>
		<tag/>
		<url/>
	</scm>
	<properties>
		<java.version>17</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-thymeleaf</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-websocket</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-security</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>software.amazon.awssdk</groupId>
			<artifactId>s3</artifactId>
			<version>2.31.9</version>
			<scope>compile</scope>
		</dependency>
		<dependency>
			<groupId>software.amazon.awssdk</groupId>
			<artifactId>sqs</artifactId>
			<version>2.31.9</version>
			<scope>compile</scope>
		</dependency>
		<dependency>
			<groupId>software.amazon.awssdk</groupId>
			<artifactId>sns</artifactId>
			<version>2.31.9</version>
			<scope>compile</scope>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

Let’s create a simple UserDetailsService with hard coded users bob and Alice:

@Configuration
@EnableWebSecurity
@EnableMethodSecurity
public class SecurityConfig {
    @Bean
    public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
        http
                .authorizeHttpRequests(authz -> authz
                        .anyRequest().authenticated()
                )
                .httpBasic(Customizer.withDefaults());
        return http.build();
    }

    @Bean
    public UserDetailsService userDetailsService() {
        var bob = User.withUsername("bob")
                .password("{noop}bob")
                .roles("USER")
                .build();
        var alice = User.withUsername("alice")
                .password("{noop}alice")
                .roles("USER")
                .build();
        return new InMemoryUserDetailsManager(bob, alice);
    }
}

Then, create an UI and the SSE endpoint for the user to subscribe to.

Home page

<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
    <title>S3 File Notifications</title>
    <meta charset="UTF-8"/>
    <style>
        body { font-family: sans-serif; max-width: 600px; margin: 2em auto; }
        h2 { color: #2c3e50; }
        .url { padding: 10px; border: 1px solid #ccc; margin-bottom: 10px; word-break: break-all; }
    </style>
</head>
<body>
<h2>Hello, <span th:text="${username}"></span>!</h2>
<p>Waiting for file links...</p>
<div id="messages"></div>

<script>
    const eventSource = new EventSource('/sse');

    eventSource.onmessage = function (event) {
        const div = document.createElement("div");
        div.classList.add("url");
        div.textContent = event.data;
        document.getElementById("messages").prepend(div);
    };

    eventSource.onerror = function () {
        console.error("SSE connection error");
    };
</script>
</body>
</html>

HomeController

@Controller
public class HomeController {
    @GetMapping("/")
    public String home(Model model, Authentication authentication) {
        model.addAttribute("username", authentication.getName());
        return "home";
    }
}

SSEController

@RestController
public class SseController {
    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();

    @GetMapping("/sse")
    public SseEmitter stream(Authentication authentication) {
        String username = authentication.getName();
        SseEmitter emitter = new SseEmitter(0L); // No timeout
        emitters.put(username, emitter);
        emitter.onCompletion(() -> emitters.remove(username));
        emitter.onTimeout(() -> emitters.remove(username));
        return emitter;
    }

    public void sendToUser(String username, String presignedUrl) {
        SseEmitter emitter = emitters.get(username);
        if (emitter != null) {
            try {
                emitter.send(SseEmitter.event().data(presignedUrl));
            } catch (IOException e) {
                emitters.remove(username);
            }
        }
    }
}

Now, let’s create services to handle the incoming SQS messages. Your application needs to poll SQS for the messages, not the other way around.

SQS Polling

@Service
@Slf4j
public class SqsPollingService {
    private final SnsEventHandler snsEventHandler;
    private final SqsClient sqsClient;
    private final String queueUrl;

    public SqsPollingService(SnsEventHandler snsEventHandler) {
        this.snsEventHandler = snsEventHandler;

        this.sqsClient = SqsClient.builder()
                .region(Region.US_EAST_1)
                .credentialsProvider(ProfileCredentialsProvider.create("aws-demo"))
                .build();

        this.queueUrl = sqsClient.getQueueUrl(GetQueueUrlRequest.builder()
                .queueName("s3-event-download-queue")
                .build()).queueUrl();

    }

    @PostConstruct
    void init() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(this::startPolling);
    }
    public void startPolling() {
        while (true) {
            ReceiveMessageResponse response = sqsClient.receiveMessage(ReceiveMessageRequest.builder()
                    .queueUrl(queueUrl)
                    .waitTimeSeconds(20) // long polling
                    .maxNumberOfMessages(5)
                    .build());

            for (Message message : response.messages()) {
                handleMessage(message);
                sqsClient.deleteMessage(DeleteMessageRequest.builder()
                        .queueUrl(queueUrl)
                        .receiptHandle(message.receiptHandle())
                        .build());
            }
        }
    }

    private void handleMessage(Message message) {
        log.info("Received message: {}", message.body());
        try {
            // SNS wraps your message in an outer envelope
            ObjectMapper mapper = new ObjectMapper();
            JsonNode outer = mapper.readTree(message.body());
            String innerMessage = outer.get("Message").asText();

            JsonNode payload = mapper.readTree(innerMessage);
            String s3Key = payload.get("s3Key").asText();
            String username = payload.get("username").asText();

            snsEventHandler.handleSnsEvent("cost-report-test-001", s3Key, username);

        } catch (Exception e) {
            log.error("Failed to handle message", e);
        }
    }
}

With this service, when there are messages arrive, it will call snsEventHandler to send message to the user.

SNS Event Handler

@Service
public class SnsEventHandler {
    private final SseController sseController;

    public SnsEventHandler(SseController sseController) {
        this.sseController = sseController;
    }

    // Simulate receiving an SNS event
    public void handleSnsEvent(String bucket, String key, String username) {
        String url = generatePresignedUrl(bucket, key);
        sseController.sendToUser(username, url);
    }

    private String generatePresignedUrl(String bucket, String key) {
        S3Presigner presigner = S3Presigner.builder()
                .credentialsProvider(ProfileCredentialsProvider.create("aws-demo"))
                .region(Region.US_EAST_1)
                .build();

        GetObjectRequest getObjectRequest = GetObjectRequest.builder()
                .bucket(bucket)
                .key(key)
                .build();

        GetObjectPresignRequest presignRequest = GetObjectPresignRequest.builder()
                .signatureDuration(Duration.ofMinutes(10))
                .getObjectRequest(getObjectRequest)
                .build();

        return presigner.presignGetObject(presignRequest).url().toString();
    }
}

Simulate the message

Now, you can publish a message to the SNS topic to see the result:

You can get the full source code for the demo here on GitHub https://github.com/datmt/Spring-tutorial/tree/main/sse-sqs-sns-s3

Conclusion

With AWS solutions like SNS, SQS, you can build a robust workflow for notifying users when a long-running task is done. This is just the PoC, in your production-grade application, consider the solutions for retrying, and resetting the polling when issues occur.

Leave a Comment