Implement SSE with Spring Boot And Angular

Overview

Recently I worked on a web app that needed to call an external API. The web app comprises an Angular (16) client and a Spring Boot (3) server. The call to that external API can be slow (up to 30 seconds). Cases like this are ideal for using multithreading.

In this post, I will show you how to implement SSE with Spring Boot And Angular.

The demo app

In this post, I will build a fortune teller application where the user can enter her name (in the Angular app), and hit the submit button (the request will be sent to the Spring boot app). Instead of actually calling an external API, I will simulate this call with a simple Thread.sleep call.

Upon consulting with my friend, I decided to go with this approach:

Angular spring boot SSE flow

As you can see from the diagram, the Spring Boot server offers two endpoints

  1. the /notify endpoint: This endpoint is for the Angular application to subscribe to in order to receive the result when it’s ready. The Angular app needs to register this before requesting the long-running task
  2. the /future endpoint: This is the endpoint for the user to submit the request to tell her future. It will return immediately with a string telling the user to wait. Before that, the Spring boot app will call the long-running service to compute the fortune.

The Spring Boot App

Let’s start with the Spring boot app first. In the main class, you need to have @EnableAsync to enable async processing.

@SpringBootApplication
@EnableAsync
public class FortuneTellerServerApplication {
    public static void main(String[] args) {
        SpringApplication.run(FortuneTellerServerApplication.class, args);
    }
}

The controller is quite simple with two endpoints:

@RestController
@RequestMapping("/teller")
@CrossOrigin(origins = "http://localhost:4200")
public class FortuneTellerController {
    private final FortuneTellerService fortuneTellerService;
    private final Logger logger = LoggerFactory.getLogger(FortuneTellerController.class);

    public FortuneTellerController(FortuneTellerService fortuneTellerService) {
        this.fortuneTellerService = fortuneTellerService;
    }

    @GetMapping("/future/{name}/{subscriberId}")
    public SimpleResponse tellFuture(@PathVariable String name, @PathVariable String subscriberId) {
        fortuneTellerService.tellingFuture(subscriberId, name);
        return new SimpleResponse("Your future is being told!");
    }

    @GetMapping("/subscribe/{subscriberId}")
    public SseEmitter streamSse(@PathVariable String subscriberId) {
        SseEmitter emitter = new SseEmitter();
        logger.info("Emitter created with timeout {} for subscriberId {}", emitter.getTimeout(), subscriberId);
        SseEmitterManager.addEmitter(subscriberId, emitter);

        // Set a timeout for the SSE connection (optional)
        emitter.onTimeout(() -> {
            logger.info("Emitter timed out");
            emitter.complete();
            SseEmitterManager.removeEmitter(subscriberId);
        });

        // Set a handler for client disconnect (optional)
        emitter.onCompletion(() -> {
            logger.info("Emitter completed");
            SseEmitterManager.removeEmitter(subscriberId);
        });

        return emitter;
    }
}

record SimpleResponse(String content) {
}

The main logic for SSE happens here. When the Angular app sends the request to the /notify endpoint, an emitter is created and added to the SseEmitterManager class.

I need to pass a subscriber Id to each request to the /notify endpoint (don’t do this in production, you should not let the user send the id without verifying their permission) because I need to send the notification to the right user. This id is generated by the Angular app using UUID.

The SseEmitterManager is a simple static class to store the emitters in a map. You can replace this with a @Service if you prefer.

public class SseEmitterManager {
    private static final Logger logger = Logger.getLogger(SseEmitterManager.class.getName());
    private static final Map<String, SseEmitter> emitters = new HashMap<>();

    public static void addEmitter(String subscriberId, SseEmitter emitter) {
        emitters.put(subscriberId, emitter);
    }

    public static void removeEmitter(String subscriberId) {
        emitters.remove(subscriberId);
    }

    public static void sendSseEventToClients(String subscriberId, String data) {
        var emitter = emitters.get(subscriberId);
        if (emitter == null) {
            logger.warning("No client with subscriber Id " + subscriberId + " found!");
            return;
        }
        try {
            emitter.send(data);
        } catch (IOException e) {
            logger.warning("Error sending event to client: " + e.getMessage());
        }
    }
}

Finally, here is the service that handles the long-running task.

@Service
public class FortuneTellerService {
    private final Logger logger = Logger.getLogger(FortuneTellerService.class.getName());
    @Async
    public void tellingFuture(String subscriberId, String name) {
        Random random = new Random();
        try {
            logger.info("Processing future for " + name);
            Thread.sleep(random.nextInt(5_000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.info("Finished processing future for " + name);
        String processedData =String.format("For %s percent certainty, %s will have a %s future!", random.nextInt(100), name, random.nextBoolean() ? "bright" : "dark");
        SseEmitterManager.sendSseEventToClients(subscriberId, processedData);
    }
}

Notice that I have the @Async annotation.

That’s all I need for the server. Let’s move on to the client.

The Angular app

The meat of the Angular app is the fortune teller service. This is where the frontend app subscribes and send a request to the backend server:

@Injectable({
  providedIn: 'root'
})
export class FortuneTellerService {

  private eventSource: EventSource | undefined;
  private sseDataSubject: Subject<string> = new Subject<string>();
  private static retryCount = 0;
  private static readonly MAX_RETRIES = 5;
  //generate unique id for each subscriber
  private static subscriberId = uuid();
  constructor(private httpClient: HttpClient) {
  }

  private connectToSSE() {
    this.eventSource = new EventSource(`http://localhost:8080/teller/subscribe/${FortuneTellerService.subscriberId}`);
    console.log('creating event source');
    this.eventSource.onmessage = event => {
      console.log('received event', event)
      this.sseDataSubject.next(event.data);
    };

    this.eventSource.onerror = error => {
      console.log('error', error);
      if (FortuneTellerService.retryCount > FortuneTellerService.MAX_RETRIES) {
        console.log('too many retries');
        this.sseDataSubject.error(error);
        this.eventSource!.close();
        return;
      }
      FortuneTellerService.retryCount++;
      this.sseDataSubject.error(error);
      this.eventSource!.close();
      this.connectToSSE();
    };

  }
  subscribeToFortuneTeller(): Observable<string> {
    if (!this.eventSource) {
      this.connectToSSE();
    }
    return this.sseDataSubject.asObservable();
  }

  requestFortuneTeller(name: string): Observable<SimpleResponse> {
    return this.httpClient.get<SimpleResponse>(`http://localhost:8080/teller/future/${name}/${FortuneTellerService.subscriberId}`).pipe(shareReplay());
  }

}

There isn’t much code here. There are two main methods:

  • subscribeToFortuneTeller: This is what the Angular app calls to subscribe to the result when it’s ready. I put it in ngOnInit. If you don’t know Angular, you can call this method when the document is ready.
  • requestFortuneTeller: The end user will call this to request the fortune telling.

Demo time

Here is a quick demo so you can see the app in action:

If you say it’s one of the ugliest UI ever, I’d agree, and also thank you for your compliment :).

However, you can see that SSE is working.

I’m going to do some performance tests to see how to add this feature to production. I noticed that the emitters sometimes encounter timeouts (thus, I did some retry logic in the Angular app).

Sending Headers With EventSource

After creating the demo app, I was excited because it worked. However, when implementing in my production app, there is one issue: I couldn’t send bearer tokens with EventSource.

In fact, the EventSource API doesn’t let you set any headers (as of the time of this writing).

So, if you want to send headers (in most cases you would), you need to find a different solution. In my case, I found this polyfill that wraps EventSource with a fetch request: https://github.com/Azure/fetch-event-source

Using fetch, you can modify the angular service like so:

private connectToSSE() {
    const sse = this;
    fetchEventSource(`http://localhost:8080/teller/subscribe/${FortuneTellerService.subscriberId}`, {
      method: 'GET',
      headers: {
        'Accept': EventStreamContentType,
        'Authorization': `Bearer ` + YOUR_BEARER_TOKEN
      },
      async onopen(response) {
        if (response.ok && response.headers.get('content-type') === EventStreamContentType) {
          SseService.isConnected = true;
          return; // everything's good
        } else if (response.status >= 400 && response.status < 500 && response.status !== 429) {
          // client-side errors are usually non-retriable:
          throw new Error('Client error: ' + response.status);
        } else {
          //
        }
      },
      onmessage(msg) {
        // if the server emits an error message, throw an exception
        // so it gets handled by the onerror callback below:
        if (msg.event === 'Error') {
          throw new Error(msg.data);
        }
        sse.sseDataSubject.next(JSON.parse(msg.data));
      },
      onclose() {
        // if the server closes the connection unexpectedly, retry:
        throw new Error('Connection closed unexpectedly');
      },
      onerror(err) {
        console.log('error', err);
      }
    })
  

Now your client app can pass headers when making requests to your backend just fine.

Conclusion

There you have it: a complete app using SSE to handle long-running tasks. I encourage you to check out the source code and give it a try here on Github

Leave a Comment