Photo by Mika Baumeister / Unsplash

Implementing Server-Sent Events (SSE) using Go

Tech Stuff Aug 15, 2022

Have you ever wondered what's it like to build a web app to show a stream of data, for example receiving a live sports score, or showing the price of cryptocurrency conversion rate every second? If you notice, building those systems is unidirectional in the sense that we only send or push data from our server to the client. Thus we use SSE because of one main reason; it's unidirectional (we can also use WebSocket, but it's expensive). How it simply works is basically the client initiates a regular HTTP request to the server. Then the client simply waits for a response with a stream of messages over time. This blog is going to show only to make that work on the server by creating two endpoints one for broadcasting or sending a message to all connected clients and another one for clients to subscribe and listen for any message.  Follow the upcoming blog on decoupling this service to get a stream of data from a message broker (RabbitMQ) and then fan out to this service to broadcast the message to connected clients.

How to implement it in Go

We initially start by declaring a struct for us to save our client's storage and of course check for events like broadcasting or closing a specific client's connection.

// package models => entity.go
type Broker struct {

	/*
	   Events are pushed to this channel by the main events-gathering          routine
	*/
	Notifier chan []byte

	// New client connections
	NewClients chan chan []byte

	// Closed client connections
	ClosingClients chan chan []byte

	// Client connections registry
	Clients map[chan []byte]bool
}

We shall then initialize a variable of Broker

// package models => data.go
InitBroker = &Broker{
    Notifier:       make(chan []byte, 1),
    NewClients:     make(chan chan []byte),
    ClosingClients: make(chan chan []byte),
    Clients:        make(map[chan []byte]bool),
}

After creating the broker instance, we should listen and wait for three actions to happen. Those are; When a client comes,  a client disconnect from our service, and last if a notification message has arrived from our second endpoint.

// package models => sse_layer.go

func (brokerRepo *brokerRepository) Listen() {
	for {
		select {
		case s := <-brokerRepo.dbBroker.NewClients:
			// A new client has joined
			brokerRepo.dbBroker.Clients[s] = true
			log.Printf("Client added. %d registered clients", len(brokerRepo.dbBroker.Clients))
		case s := <-brokerRepo.dbBroker.ClosingClients:
			// A client has dettached
			// remove them from our clients map
			delete(brokerRepo.dbBroker.Clients, s)
			log.Printf("Removed client. %d registered clients", len(brokerRepo.dbBroker.Clients))
		case event := <-brokerRepo.dbBroker.Notifier:
			// case for getting a new msg
			// Thus send it to all clients
			for clientMessageChan, _ := range brokerRepo.dbBroker.Clients {
				clientMessageChan <- event
			}
		}
	}
}

Last but not least we ended up adding our HTTP handler for our endpoint to manage our clients:

//source: api -> rest -> sse -> sse.go
func (f *eventServiceHandler) Stream(w http.ResponseWriter, r *http.Request) {
	flusher, ok := w.(http.Flusher)

	if !ok {
		http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
		return
	}

	w.Header().Set("Content-Type", "text/event-stream")
	w.Header().Set("Cache-Control", "no-cache")
	w.Header().Set("Connection", "keep-alive")
	w.Header().Set("Access-Control-Allow-Origin", "*")

	// Each connection registers its own message channel with the Broker's connections registry
	messageChan := make(chan []byte)

	//Get broker
	broker := f.service.GetBroker()

	// Signal the broker that we have a new connection
	broker.NewClients <- messageChan

	// Remove this client from the map of connected clients
	// when this handler exits.
	defer func() {
		broker.ClosingClients <- messageChan
	}()

	go func() {
		// Listen to connection close and un-register messageChan
		<-r.Context().Done()
		broker.ClosingClients <- messageChan
	}()

	for {
		// Write to the ResponseWriter
		// Server Sent Events compatible
		fmt.Fprintf(w, "data: %s\n\n", <-messageChan)

		// Flush the data immediatly instead of buffering it for later.
		flusher.Flush()
	}
}

Broadcasting a Message

In the real-world scenario, we would have a service to fetch data from other third-party services and publish it directly to this service or through a message broker (will implement this one in the next blog). But for this example, we'll just include a second endpoint to broadcast a message to the clients.

func (f *eventServiceHandler) BroadcastMessage(w http.ResponseWriter, r *http.Request) {
	var message models.MessageBroker

	err := json.NewDecoder(r.Body).Decode(&message)

	if err != nil {
		serializer.JSON(w, http.StatusBadRequest, &serializer.GenericResponse{
			Success: false,
			Code:    http.StatusBadRequest,
			Message: "cant parse request",
		})
		return
	}

	byteData, err := json.Marshal(message)

	if err != nil {
		serializer.JSON(w, http.StatusBadRequest, &serializer.GenericResponse{
			Success: false,
			Code:    http.StatusBadRequest,
			Message: "parging struct to byte",
		})
		return
	}

	broker := f.service.GetBroker()

	broker.Notifier <- byteData

	serializer.JSON(w, http.StatusOK, &serializer.GenericResponse{
		Success: true,
		Code:    http.StatusOK,
	})

}

Our Client

Our client in our case will just be an Angular service. We'll use an EventSource instance to connect to our SSE service.  EventSource opens a persistent connection to an HTTP server, which sends events in text/event-stream format.

Simply put, our component.ts will look like this:

import { Component } from '@angular/core';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.scss'],
})
export class AppComponent {
  title = 'sse-test';
  messageData: any;

  ngOnInit(): void {
    this.connect();
  }

  connect(): void {
    let source = new EventSource('http://0.0.0.0:8181/api/v1/sso/stream');
    source.addEventListener('message', (message) => {
      this.messageData = JSON.parse(message.data);
    });

    source.onerror = (err) => {};
  }
}

And showing the message using our HTML file:

<div>
  Message: {{this.messageData?.message}}
</div>

Result

Github source: https://github.com/MerNat/sso-clean-arch

Tags

Meron Hayle

Hi there, I'm Meron, a software engineer, an entrepreneur, and an artist, also known as a ninja in the art world.