Skip to content

clients.natsclient #

NATSClient

This module implements a client that can connect with a NATS server, allowing a client to exchange messages with other clients through this server. The reader is recommended to go through the documentation before continuing to read this document.

Publishing and subscribing

NATS can shortly be defined as a publish-subscribe message system. A client can publish a message to a specific subject which other clients will receive if they subscribed to that subject. Subject hierarchy can be achieved by using the . character:

something.anotherthing.anotherone

The characters * (match anything from the same level in the hierarchy) and > (match anything in this level and levels down in the hierarchy) are wildcards when used in a subscription request. Subscriptions created with the subjects defined below will all receive messages published to something.anotherthing.anotherone.

something.*.anotherone
something.anotherthing.*
something.anotherthing.>
something.>

Streams and Consumers

The core NATS does not guarantee 'at least once' service, meaning it does not guarantee that one of the clients received the message. Fortunately JetStream (a built-in distributed persistence system) does through the concept of streams and consumers.

Streams are 'message stores', they define how the messages are stored (and many other things) at server side.

Consumers allow clients to consume messages and acknowledge the receipt of the messages, thus providing the 'at least once' service.

JetStream can also guarantee the 'at most once' service through a couple of API requests (and with help of its clients).

NATSClient under the hood

The NATSClient class implements the client protocol allowing the client to publish and subscribe to subjects through a websocket. It can also create streams and consumers by publishing messages to system subjects. The payload of those messages should contain the configuration (json data) of each of the requests and can be found here. NATS allows you to pass a reply subject when publishing the messages. NATS will use that subject to send responses to the client.

Under the hood: Pulling messages from a consumer

Messages can be pulled from a consumer by sending the following message:

PUB $JS.API.CONSUMER.MSG.NEXT.mystream.myconsumer myinbox 49\r\n{"expires":5000000000,"batch":20,"no_wait":false}\r\n

NATS will start consuming the messages from the consumer 'myconsumer' and will send them to the subject myinbox which the NATSClient has to subscribe to. The configuration tells NATS to:- send batches of 20 messages

  • make the pull request expire after 5 seconds (5000000000 nanoseconds)
  • wait for messages until the pull request expires if there are no messages

Under the hood: Acknowledging messages

In order to guarantee 'at most once' service NATS requires you to acknowledge messages. After sending a pull request NATS send the messages to the subject that was specified in the request (reply_to field). Those messages are formatted as shown here. The class in nats_parser.v shows you how those messages can be parsed. The reply_to field in those messages will contain the system subject that should be used to acknowledge the message. It should be similar to $JS.ACK.mystream.myconsumer.11.39.618.1674209322098595786.0. You can then send the message shown below in order to acknowledge:

PUB $JS.ACK.mystream.myconsumer.11.39.618.1674209322098595786.0 0\r\n\r\n

Settings

There are a lot of settings that can modify the behavior of NATS, the file nats_types.v contains some of them which we should still experiment with. For example:- the time to wait before an acknowledgement is considered unsuccessful

  • the amount of times NATS will send the message again after an unsuccessful acknowledgement
  • whether we acknowledge all messages (default) or only the last of a batch which will acknowledge all of the messages before that
  • etc.

Run the example

You need two things in order to run the example. First you need to run a natsserver using the nats configuration that you can find in example/nats_configuration.json. You can specify that you want to enable jetstream using the -js argument.

nats-server -c example/nats_configuration.json -js -DV

Now you can run the example:

v run example/test_natsclient.v

This test consumes messages send to "ORDERS.*". It will also create a key-value store, add a key and fetch it. To test the incoming messages you can run:

nats publish ORDERS.NEW "this is my message"

If you want to test batch messages you can add the argument --count=<SOMEVALUE>

How to contribute

There is still a lot to experiment with:- For now the messages are being acknowledged after receiving them. We might want to change that behavior. We could allow users of the NATSClient class to acknowledge them themselves. This could introduce only acknowledge messages when they have been fully processed by the user.

  • We should experiment with the many settings provided by NATS
  • We could either use redis or a channel to pass the received messages to the user of NATSClient. Redis would make it more persistent while a channel would be more performant. The choice will depend on the user of the NATSClient. Maybe we can provide both.
  • Many of the publish messages in NATSClient do not use a reply_to subject, meaning NATS will not send a response on the request. We should experiment with the reply_to subject. This might help introducing failsafes (sending the acknowledgement again if it failed, etc).
  • Testing the NATSClient
  • Opening up a key value store to the user. The nats-cli is doing this using a stream with specific settings (you can see some logs below). They set the max_msgs_per_subject to 1. Adding a key-value to the store boils down to publishing a message (value) to a subject (key). The logs when running the command nats kv add mykvstore:
    PUB $JS.API.STREAM.CREATE.KV_mykvstore _INBOX.XOYl5CWiVL1vUPwgo8WAvh.lsgIf50w 378
    {\"name\":\"KV_mykvstore\",\"subjects\":[\"$KV.mykvstore.\\u003e\"],\"retention\":\"limits\",\"max_consumers\":-1,\"max_msgs\":-1,\"max_bytes\":-1,\"discard\":\"new\",\"max_age\":0,\"max_msgs_per_subject\":1,\"max_msg_size\":-1,\"storage\":\"file\",\"num_replicas\":1,\"duplicate_window\":120000000000,\"placement\":{\"cluster\":\"\"},\"deny_delete\":true,\"allow_rollup_hdrs\":true,\"allow_direct\":true,\"mirror_direct\":false}

Constants #

const natsclient_version ='${natsclient_version_major}.${natsclient_version_minor}.${natsclient_version_patch}'

fn new_natsclient #

fn new_natsclient(address string, ch_messages chan NATSMessage, ch_keyvalue chan NATSKeyValue, logger &log.Logger) !&NATSClient

struct ConnectConfig #

struct ConnectConfig {
	verbose       bool
	pedantic      bool
	tls_required  bool
	name          string
	lang          string
	version       string
	protocol      int
	echo          bool
	headers       bool
	no_responders bool
}

Configuration of all possible publish messages can be found at: https://github.com/nats-io/jsm.go/tree/main/schemas/jetstream/api/v1

struct ConsumerConfig #

struct ConsumerConfig {
	stream_name string
	config      struct {
		// TODO experiment with the configuration parameters below
		// A unique name for a consumer
		name string
		// A short description of the purpose of this consumer
		description string
		// Should be one of:
		//   none: messages must not be acknowledged
		//   all: only acknowledge the last message of a series of messages, all others will be acknowledged too
		//   explicit: each message must be acknowledged
		ack_policy string
		// How long (in nanoseconds) to allow messages to remain un-acknowledged before attempting redelivery (default 30000000000)
		// ack_wait int
		// The number of times a message will be redelivered to consumers if not acknowledged in time // default -1
		// max_deliver int
		// Filter which messages you want to receive from the stream
		// filter_subject string
		// Should be one of:
		//   instant = receive all message as fast as possible (default)
		//   original = receive messages in the timing they were published
		// replay_policy string
		// The maximum number of messages without acknowledgement that can be outstanding, once this limit is reached message delivery will be suspended (default 1000)
		// max_ack_pending int
		// The number of pulls that can be outstanding on a pull consumer, pulls received after this is reached are ignored (default 512)
		// max_waiting int
		// The largest batch property that may be specified when doing a pull on a Pull Consumer (default 0)
		// max_batch int
		// The maximum expires value that may be set when doing a pull on a Pull Consumer
		// max_expires int
		// The maximum bytes value that maybe set when dong a pull on a Pull Consumer (default 0)
		// max_bytes int
		// When set do not inherit the replica count from the stream but specifically set it to this amount
		num_replicas int = 1
		// Force the consumer state to be kept in memory rather than inherit the setting from the stream (default false)
		// mem_state bool
	}
}

configuration for creating a consumer

struct ConsumerNextMSG #

struct ConsumerNextMSG {
	// A duration from now when the pull should expire, stated in nanoseconds, 0 for no expiry
	expires i64
	// How many messages the server should deliver to the requestor
	batch int
	// Sends at most this many bytes to the requestor, limited by consumer configuration max_bytes
	// max_bytes int
	// When true a response with a 404 status header will be returned when no messages are available (default false)
	no_wait bool
	// When not 0 idle heartbeats will be sent on this interval
	// idle_heartbeat int
}

configuration for pulling the next message(s) from a consumer

struct NATSClient #

@[heap]
struct NATSClient {
mut:
	uuid                string
	myinbox             string
	websocket           &websocket.Client
	address             string
	stream_to_consumers map[string]ConsumerList

	ch_messages   chan NATSMessage
	ch_keyvalue   chan NATSKeyValue
	natsmsgparser &NATSMessageParser
pub mut:
	// pull every 5 seconds
	pull_frequency int = 5
	logger         &log.Logger
}

fn (NATSClient) ack_message #

fn (mut cl NATSClient) ack_message(ack_subject string)

fn (NATSClient) create_stream #

fn (mut cl NATSClient) create_stream(name string, subjects []string) !

fn (NATSClient) create_consumer #

fn (mut cl NATSClient) create_consumer(name string, stream string, description string) !

fn (NATSClient) subscribe #

fn (mut cl NATSClient) subscribe(subject string) !string

fn (NATSClient) publish #

fn (mut cl NATSClient) publish(subject string, reply_to string, message string) !

fn (NATSClient) hpublish #

fn (mut cl NATSClient) hpublish(subject string, headers map[string]string, reply_to string, message string) !

fn (NATSClient) create_keyvalue_store #

fn (mut cl NATSClient) create_keyvalue_store(name string) !

fn (NATSClient) add_keyvalue #

fn (mut cl NATSClient) add_keyvalue(store string, key string, value string) !

fn (NATSClient) set_value #

fn (mut cl NATSClient) set_value(store string, key string, value string) !

fn (NATSClient) get_value #

fn (mut cl NATSClient) get_value(store string, key string) !

fn (NATSClient) listen #

fn (mut cl NATSClient) listen() !

struct NATSKeyValue #

struct NATSKeyValue {
pub mut:
	store     string
	key       string
	value     string
	timestamp string
}

struct NATSMessage #

struct NATSMessage {
pub mut:
	subject  string
	sid      string
	reply_to string
	message  string
}

struct NATSMessageParser #

struct NATSMessageParser {
mut:
	data           string
	i              int
	subject        string
	sid            string
	reply_to       string
	message_length int
	message        string
	header_length  int
	headers        map[string]string
pub mut:
	on_nats_message fn (message NATSMessage, headers map[string]string) = fn (message NATSMessage, headers map[string]string) {}
	on_nats_info    fn (data string)  = fn (data string) {}
	on_nats_ping    fn ()             = fn () {}
	on_nats_pong    fn ()             = fn () {}
	on_nats_error   fn (error string) = fn (error string) {}
}

fn (NATSMessageParser) parse #

fn (mut n NATSMessageParser) parse(data string) !

fn (NATSMessageParser) parse_err #

fn (mut n NATSMessageParser) parse_err() !string

fn (NATSMessageParser) parse_info #

fn (mut n NATSMessageParser) parse_info() !string

fn (NATSMessageParser) parse_hmsg #

fn (mut n NATSMessageParser) parse_hmsg() !

fn (NATSMessageParser) parse_msg #

fn (mut n NATSMessageParser) parse_msg() !

struct StreamConfig #

struct StreamConfig {
	// A unique name for the Stream, empty for Stream Templates.
	name string
	// A short description of the purpose of this stream
	description string
	// A list of subjects to consume, supports wildcards. Must be empty when a mirror is configured. May be empty when sources are configured.
	subjects []string
	// How messages are retained in the Stream, once this is exceeded old messages are removed. Should be one of:
	// 	limits = (default)
	// 	interest =
	// 	workqueue"
	retention string = 'limits'
	// How many Consumers can be defined for a given Stream. -1 for unlimited. (default -1)
	max_consumers i64 = -1
	// How many messages may be in a Stream, oldest messages will be removed if the Stream exceeds this size. -1 for unlimited. (default -1)
	max_msgs i64 = -1
	// For wildcard streams ensure that for every unique subject this many messages are kept - a per subject retention limit (default -1)
	max_msgs_per_subject i64 = -1
	// How big the Stream may be, when the combined stream size exceeds this old messages are removed. -1 for unlimited. (default -1)
	max_bytes i64 = -1
	// Maximum age of any message in the stream, expressed in nanoseconds. 0 for unlimited. (default 0)
	max_age i64 = 0
	// The largest message that will be accepted by the Stream. -1 for unlimited. (default -1)
	max_msg_size int = -1
	// The storage backend to use for the Stream. One of: "file" (default), "memory"
	storage string = 'file'
	// How many replicas to keep for each message, best to choose 1 (no security), 3 (secure and performant) or 5 (super secure, no performance) (default 1, max 5)
	num_replicas int = 1
	// Disables acknowledging messages that are received by the Stream. (default false)
	// no_ack bool
	// When the Stream is managed by a Stream Template this identifies the template that manages the Stream.
	// template_owner string	//Restricts the ability to purge messages from a stream via the API. Cannot be change once set to true
	// 	tags []string
	//}
	// When a Stream reach it's limits either old messages are deleted or new ones are denied. Should be "new" or "old"
	discard string = 'old'
	// The time window to track duplicate messages for, expressed in nanoseconds. 0 for default"
	duplicate_window i64 = 0
	// Placement directives to consider when placing replicas of this stream, random placement when unset"
	placement struct {
		cluster string
	}
	// Sealed streams do not allow messages to be deleted via limits or API, sealed streams can not be unsealed via configuration update. Can only be set on already created streams via the Update API
	// sealed bool = false
	// Restricts the ability to delete messages from a stream via the API. Cannot be changed once set to true
	deny_delete bool = false
	// Restricts the ability to purge messages from a stream via the API. Cannot be change once set to true
	// deny_purge bool = false
	// Allows the use of the Nats-Rollup header to replace all contents of a stream, or subject in a stream, with a single new message
	allow_rollup_hdrs bool = false
	// Allow higher performance, direct access to get individual messages
	allow_direct bool = false
	// Allow higher performance, direct access for mirrors as well"
	mirror_direct bool = false
	// Rules for republishing messages from a stream with subject mapping onto new subjects for partitioning and more
	// republish struct {
	// The source object to republish
	// src string
	// The destination to publish to
	// dest string
	// Only send message headers, no bodies
	// headers_only bool = false
	//}
	// When discard policy is new and the stream is one with max messages per subject set, this will apply the new behavior to every subject. Essentially turning discard new from maximum number of subjects into maximum number of messages in a subject.
	// discard_new_per_subject bool = false
}