Nats Jetstream Implementation Steps

Introduction

NATS JetStream is a data streaming feature built on top of the core NATS messaging system. It provides enhanced features for building scalable and durable data streaming applications. Here's a step-by-step introduction to how to implement NATS JetStream

Install NATS Server

We have quite different options to install NATS; please follow the few options.

  • Docker approach (https://docs.nats.io/running-a-nats-service/introduction/installation#installing-via-docker)
  • Download the executable file and run it on the local machine (https://github.com/nats-io/nats-server/releases/)
  • Install from Source (go install github.com/nats-io/natscli/nats@latest)

In this article, we are going to look at installing from the source approach. However, the rest options need to be followed the same for further development.

1. Start NATS Server and Enable JetStream Option

  • Once you downloaded nats-server-v2.10.7-darwin-arm64.zip, please extract it and execute the "nats-server.exe" in the new command window.
  • Please use this command to run the NATS
    • nats-server.exe ( which just turns on the NATS without the persistence layer option means without jetstream, however, it has its own NATS streaming concept).
    • nats-server -js -m 8080( this option will enable jetstream).
      Nats jetstream
    • Assuming you could see the difference between these two start points.
    • If there are any changes required in your NATS, please amend those changes in nats-server.conf file.
      jetstream: {
        store_dir: "./datastore",<!--Path where you want to store the message-->
        max_memory_store: 1024MB <!--Size of your memory store-->
      }
      

2. Connect NATS from Code

func JetStreamInit() (nats.JetStreamContext, error) {
	// Connect to NATS
	log.Printf("Connect to NATS")
	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		return nil, err
	}
    log.Printf("Connected to NATS")
	// Create JetStream Context
	js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
	if err != nil {
		return nil, err
	}

	// Create a stream if it does not exist
    // please note we have an option to create this in NATS CLI
    // please play with "NATS Cheat" command
	err = CreateStream(js)
	if err != nil {
		return nil, err
	}

	return js, nil
}

3. Create a Stream

In our proof of concept, Please find our stream and subject name configuration.

package config

const (
	StreamName     = "MESSAGE"
	StreamSubjects = "MESSAGE.*"

	SubjectNameRequested  = "MESSAGE.requested"
	SubjectNameResponded = "MESSAGE.responded"
)

Code Explanation

  • The Stream name acts as a root, and the subject will act as a child node.
  • I created a few more subjects, such as requested, and responded. This use case is subscriber can get the message according to their subscription.
func CreateStream(jetStream nats.JetStreamContext) error {
	stream, err := jetStream.StreamInfo(config.StreamName)

	// stream not found, create it
	if stream == nil {
		log.Printf("Creating stream: %s\n", config.StreamName)

		_, err = jetStream.AddStream(&nats.StreamConfig{
			Name:     config.StreamName,
			Subjects: []string{config.StreamSubjects},
		})
		if err != nil {
			return err
		}
	}
	return nil
}

4. Publish Messages

Now we have Stream and Subject. It's time to publish the message to JetStream.

func publish(js nats.JetStreamContext) {

	log.Printf("publish")	

	err := js.Publish(config.SubjectNameRequested, "Requested Msg")
	if err != nil {
			log.Println(err)
		} else {
			log.Printf("Publishing  =>  Message")
		}
}

5. Create Consumers

func consume(js nats.JetStreamContext) {
	_, err := js.Subscribe(config.SubjectNameRequested, func(m *nats.Msg) {
		err := m.Ack()

		if err != nil {
			log.Println("Unable to Ack", err)
			return
		}

		log.Printf("Subscriber / Consumer  =>  Subject: %s \n", m.Subject, m.Data)

		},nats.ManualAck())

	if err != nil {
		log.Println("Subscribe failed")
		return
	}
}

6. Acknowledge Messages

Streams support acknowledges receiving a message. If you send a Request() to a subject covered by the configuration of the Stream, the service will reply to you once it stores the message. If you just publish, it will not. A Stream can be set to disable Acknowledgements by setting NoAck it to true in its configuration.

7. Build and Run

Finally, build and run.

Build and Run

Reference

Conclusion

Implementing NATS JetStream involves configuring and enabling JetStream in the NATS Server, creating streams for durable message storage, publishing messages to defined streams, and setting up consumers to efficiently consume and acknowledge messages. With additional features like acknowledgment modes and consumer rate limiting, NATS JetStream provides a robust foundation for building scalable and resilient data streaming applications. Explore the documentation for more advanced configurations and tailor the implementation to meet specific use case requirements.


Similar Articles