Implementing Server-Sent Events (SSE) using Go
Have you ever wondered what's it like to build a web app to show a stream of data, for example receiving a live sports score, or showing the price of cryptocurrency conversion rate every second? If you notice, building those systems is unidirectional in the sense that we only send or push data from our server to the client. Thus we use SSE because of one main reason; it's unidirectional (we can also use WebSocket, but it's expensive). How it simply works is basically the client initiates a regular HTTP request to the server. Then the client simply waits for a response with a stream of messages over time. This blog is going to show only to make that work on the server by creating two endpoints one for broadcasting or sending a message to all connected clients and another one for clients to subscribe and listen for any message. Follow the upcoming blog on decoupling this service to get a stream of data from a message broker (RabbitMQ) and then fan out to this service to broadcast the message to connected clients.
How to implement it in Go
We initially start by declaring a struct for us to save our client's storage and of course check for events like broadcasting or closing a specific client's connection.
// package models => entity.go
type Broker struct {
/*
Events are pushed to this channel by the main events-gathering routine
*/
Notifier chan []byte
// New client connections
NewClients chan chan []byte
// Closed client connections
ClosingClients chan chan []byte
// Client connections registry
Clients map[chan []byte]bool
}
We shall then initialize a variable of Broker
// package models => data.go
InitBroker = &Broker{
Notifier: make(chan []byte, 1),
NewClients: make(chan chan []byte),
ClosingClients: make(chan chan []byte),
Clients: make(map[chan []byte]bool),
}
After creating the broker instance, we should listen and wait for three actions to happen. Those are; When a client comes, a client disconnect from our service, and last if a notification message has arrived from our second endpoint.
// package models => sse_layer.go
func (brokerRepo *brokerRepository) Listen() {
for {
select {
case s := <-brokerRepo.dbBroker.NewClients:
// A new client has joined
brokerRepo.dbBroker.Clients[s] = true
log.Printf("Client added. %d registered clients", len(brokerRepo.dbBroker.Clients))
case s := <-brokerRepo.dbBroker.ClosingClients:
// A client has dettached
// remove them from our clients map
delete(brokerRepo.dbBroker.Clients, s)
log.Printf("Removed client. %d registered clients", len(brokerRepo.dbBroker.Clients))
case event := <-brokerRepo.dbBroker.Notifier:
// case for getting a new msg
// Thus send it to all clients
for clientMessageChan, _ := range brokerRepo.dbBroker.Clients {
clientMessageChan <- event
}
}
}
}
Last but not least we ended up adding our HTTP handler for our endpoint to manage our clients:
//source: api -> rest -> sse -> sse.go
func (f *eventServiceHandler) Stream(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(chan []byte)
//Get broker
broker := f.service.GetBroker()
// Signal the broker that we have a new connection
broker.NewClients <- messageChan
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
broker.ClosingClients <- messageChan
}()
go func() {
// Listen to connection close and un-register messageChan
<-r.Context().Done()
broker.ClosingClients <- messageChan
}()
for {
// Write to the ResponseWriter
// Server Sent Events compatible
fmt.Fprintf(w, "data: %s\n\n", <-messageChan)
// Flush the data immediatly instead of buffering it for later.
flusher.Flush()
}
}
Broadcasting a Message
In the real-world scenario, we would have a service to fetch data from other third-party services and publish it directly to this service or through a message broker (will implement this one in the next blog). But for this example, we'll just include a second endpoint to broadcast a message to the clients.
func (f *eventServiceHandler) BroadcastMessage(w http.ResponseWriter, r *http.Request) {
var message models.MessageBroker
err := json.NewDecoder(r.Body).Decode(&message)
if err != nil {
serializer.JSON(w, http.StatusBadRequest, &serializer.GenericResponse{
Success: false,
Code: http.StatusBadRequest,
Message: "cant parse request",
})
return
}
byteData, err := json.Marshal(message)
if err != nil {
serializer.JSON(w, http.StatusBadRequest, &serializer.GenericResponse{
Success: false,
Code: http.StatusBadRequest,
Message: "parging struct to byte",
})
return
}
broker := f.service.GetBroker()
broker.Notifier <- byteData
serializer.JSON(w, http.StatusOK, &serializer.GenericResponse{
Success: true,
Code: http.StatusOK,
})
}
Our Client
Our client in our case will just be an Angular service. We'll use an EventSource instance to connect to our SSE service. EventSource opens a persistent connection to an HTTP server, which sends events in text/event-stream
format.
Simply put, our component.ts will look like this:
import { Component } from '@angular/core';
@Component({
selector: 'app-root',
templateUrl: './app.component.html',
styleUrls: ['./app.component.scss'],
})
export class AppComponent {
title = 'sse-test';
messageData: any;
ngOnInit(): void {
this.connect();
}
connect(): void {
let source = new EventSource('http://0.0.0.0:8181/api/v1/sso/stream');
source.addEventListener('message', (message) => {
this.messageData = JSON.parse(message.data);
});
source.onerror = (err) => {};
}
}
And showing the message using our HTML file:
<div>
Message: {{this.messageData?.message}}
</div>
Result

Github source: https://github.com/MerNat/sso-clean-arch