Photo by Daria Nepriakhina 🇺🇦 / Unsplash

Implementing Server-Sent Events (SSE) service with a Message Broker in Go

Tech Stuff Oct 10, 2022

In the previous blog, We implemented a simple SSE (server-sent event) service using Go. Let us now make the service more fun and advance it in a way that we fetch data from a third-party service and pipe it to a message broker service (i.e. in our case we'll use RabbitMQ) and finally publish it to any client who subscribes to our service.  To make the service simple we'll make it a Monolith architecture. In the real world though, making it a micro-service architecture is well-suited for such an environment.

In our previous blog, we're sending the data received from our API endpoint and returned it to all connected clients. In this blog, we're going to decouple our previously made service to use a source from an external third-party service and pipe it through a message broker. Our message broker (RabbitMQ) will be having a "fan-out" exchange so that it'll publish data to any service connected to it. This is really important as we basically scale out (horizontal scaling) our SSE service when deploying and let all replicas subscribe to our message broker.

Let's begin

Let us first see the architecture in a high-level diagram.

Let us now break down the module of our service

Module One: Auto Data fetcher from third-party API service

This service is an independent module within the SSE service to handle the logic of fetching data from the external third-party service (CoinAPI.io) and act as a producer which in turn will send the data to our message broker.

The source code lives in a directory called "fetching-crypto-service" within the given GitHub repository. We should have our message broker running before running the script though, to do so, just "docker-compose up" in the root directory of the repository.

The next step will be registering to CoinAPI.io and getting an API token to retrieve/fetch an exchange rate of a specific digital currency (i.e. BTC -> USD).

We export the token to our current environment with the name "COIN_API_KEY" and run the main.go file as follows

$ export COIN_API_KEY=<YOUR_TOKEN>

# Then run our fetching-crypto-service using
$ go run main.go

After that, you'll be seeing the service will retrieve data from the "coinapi.io" endpoint and send it to our message broker. N.B. This service will run for a total time of 1 minute and fetch data every 1 second (n.b. this can be changed in the script).

Now that we have our data in our message broker, we should wait for the broker to publish data to subscribers. Our subscriber in our case is SSE service. Then SSE service will be pushing/sending the message to all connected clients.

Module Two: Our listener/subscriber to RabbitMQ

Our listener function is basically waiting for any new message from our message broker (rabbitMQ) and sending it to all connected SSE clients.

Our subscriber/listener module looks like this:

# Taken from messaging_layer.go
func (mRepo *messagingRepository) ListenAndSend() {
	//We should listen and of course trigger to send to all connected clients once a message arrives from our r.mq
	ch, err := mRepo.rMqBroker.Channel()
	if err != nil {
		panic("Failed to open a channel")
	}
	defer ch.Close()

	//We need to make sure exchange exists
	err = ch.ExchangeDeclare(
		"crypto-info", // name of exchange
		"fanout",      // type of exchange
		true,          // durable
		false,         // auto-deleted
		false,         // internal
		false,         // no-wait
		nil,           // arguments
	)

	if err != nil {
		panic("Failed to declare an exchange")
	}

	q, err := ch.QueueDeclare(
		"",    // name (temporary queue)
		false, // durable (we don't need it after server restarts or connection of channel closes)
		false, // delete when unused
		true,  // exclusive
		false, // no-wait
		nil,   // arguments
	)
	if err != nil {
		panic("Failed to declare a queue")
	}

	//Bind exchange with queue
	err = ch.QueueBind(
		q.Name,        // queue name
		"",            // routing key
		"crypto-info", // exchange
		false,
		nil,
	)

	if err != nil {
		panic("Failed to bind a queue")
	}

	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)

	if err != nil {
		panic("Failed to register a consumer")
	}

	var close chan struct{}

	go func() {
		for d := range msgs {
			var coinApi CointAPI
			if err := json.Unmarshal(d.Body, &coinApi); err != nil {
				log.Fatal(err.Error())
			}
			byteData, _ := json.Marshal(coinApi)
			mRepo.broker.Notifier <- byteData
			log.Printf("Sending cryto info to client(s)")
		}
	}()

	<-close
}

implemented SSE service using Go

External Module: Our Angular client

Our client in our case is simply an angular app with the usage of EventSource when communicating with our SSE server.

import { Component } from '@angular/core';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.scss'],
})
export class AppComponent {
  title = 'sse-test';
  cryptoData: any;
  counter: number = 0;

  ngOnInit(): void {
    this.connect();
  }

  connect(): void {
    const source = new EventSource('http://0.0.0.0:8181/api/v1/sso/stream');
    source.addEventListener('message', (message) => {
      this.counter += 1;
      this.cryptoData = JSON.parse(message.data);
      this.cryptoData = {
        ...this.cryptoData,
        rate: parseFloat(this.cryptoData.rate).toFixed(2),
      };
    });

    source.onerror = (err) => {
      console.log(err);
    };
  }
}

And our HTML template file will simply be looking like this:

<div>
  Number of times we're receiving data: {{this.counter}}<br/>
  1 ETH Price Now: USD {{cryptoData?.rate}}
</div>

Result

Conclusion

With Go, it's super simple to integrate and decouple services according to what kind of system we want to implement. We also have a variety of tools that we can choose when picking a service for a message broker. We can also pick Kafka which is more powerful and performant than RabbitMQ, though our use case should be working with massive data or high throughput cases. Here's a good practice on how to use and manage RabbitMQ from their official site.

source: https://github.com/MerNat/sso-clean-arch

Tags

Meron Hayle

Hi there, I'm Meron, a software engineer, an entrepreneur, and an artist, also known as a ninja in the art world.