added nats + worker modules
This commit is contained in:
94
worker/README.md
Normal file
94
worker/README.md
Normal file
@@ -0,0 +1,94 @@
|
||||
# go-worker
|
||||
|
||||
A simple, reusable, production-ready background worker module for NATS JetStream in Go.
|
||||
|
||||
This module allows you to easily run reliable, concurrent background workers that consume messages from JetStream streams using pull-based subscriptions. It integrates seamlessly with the `go-nats` embedded/clustered NATS module and follows the same clean, functional-options pattern as the rest of your toolkit.
|
||||
|
||||
## Features
|
||||
|
||||
- Pull-based JetStream consumer with configurable batch fetching
|
||||
- Concurrent message processing (configurable workers)
|
||||
- Automatic stream and consumer creation (idempotent)
|
||||
- Configurable storage (Memory or File) for development vs production durability
|
||||
- Graceful shutdown with context cancellation and subscription draining
|
||||
- Explicit Ack/Nak handling with retry support
|
||||
- Custom logger injection
|
||||
- Works with embedded or external NATS servers
|
||||
|
||||
## Installation
|
||||
|
||||
```bash
|
||||
go get git.citc.tech/go/web/worker
|
||||
```
|
||||
|
||||
## Quick Start
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"os"
|
||||
|
||||
"git.citc.tech/go/web/nats"
|
||||
"git.citc.tech/go/web/worker"
|
||||
)
|
||||
|
||||
func main() {
|
||||
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
|
||||
|
||||
// Connect to NATS (embedded or external)
|
||||
nc, natsShutdown, err := nats.New()
|
||||
if err != nil {
|
||||
logger.Error("failed to connect to NATS", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer natsShutdown()
|
||||
|
||||
// Create background worker
|
||||
w, err := worker.New(nc,
|
||||
worker.WithStream("ORDERS"),
|
||||
worker.WithConsumer("order-processor", "orders.process"),
|
||||
worker.WithHandler(func(ctx context.Context, msg *nats.Msg) error {
|
||||
logger.Info("processing order", "data", string(msg.Data))
|
||||
// Your business logic here
|
||||
return nil
|
||||
}),
|
||||
worker.WithConcurrency(5),
|
||||
worker.WithStorage(nats.FileStorage), // or MemoryStorage for dev
|
||||
worker.WithLogger(logger),
|
||||
)
|
||||
if err != nil {
|
||||
logger.Error("failed to create worker", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Run worker (blocks until shutdown signal)
|
||||
go w.Run()
|
||||
|
||||
// Your main application (e.g. HTTP server) runs here...
|
||||
|
||||
// On exit
|
||||
defer w.Shutdown()
|
||||
}
|
||||
```
|
||||
|
||||
## Options
|
||||
|
||||
| Option | Description | Default |
|
||||
|-------------------------|--------------------------------------------------|------------------------|
|
||||
| `WithStream(name)` | JetStream stream name | required |
|
||||
| `WithConsumer(name, subject)` | Consumer name and filter subject | required |
|
||||
| `WithDurable(name)` | Durable consumer name (if different from consumer) | uses consumer name |
|
||||
| `WithHandler(h)` | Message processing function | required |
|
||||
| `WithConcurrency(n)` | Number of concurrent goroutines | 1 |
|
||||
| `WithAckWait(d)` | Time to wait for acknowledgment | 30s |
|
||||
| `WithMaxDeliver(n)` | Maximum delivery attempts | 1 |
|
||||
| `WithLogger(l)` | Custom slog logger | slog.Default() |
|
||||
| `WithStorage(t)` | Stream storage type (MemoryStorage or FileStorage) | FileStorage (safe) |
|
||||
|
||||
## Storage Types
|
||||
|
||||
- `nats.MemoryStorage` — Fast, in-memory only (lost on restart). Great for dev/testing.
|
||||
- `nats.FileStorage` — Persistent on disk (survives restarts). Recommended for production.
|
||||
11
worker/errors.go
Normal file
11
worker/errors.go
Normal file
@@ -0,0 +1,11 @@
|
||||
package worker
|
||||
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
ErrInvalidHandler = errors.New("handler is nil")
|
||||
ErrStreamNotFound = errors.New("stream not found")
|
||||
ErrConsumerNotFound = errors.New("consumer not found")
|
||||
ErrStreamNameRequired = errors.New("stream name required")
|
||||
ErrSubjectRequired = errors.New("subject required")
|
||||
)
|
||||
67
worker/options.go
Normal file
67
worker/options.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
type Handler func(context.Context, *nats.Msg) error
|
||||
|
||||
type Option func(*config)
|
||||
|
||||
type config struct {
|
||||
streamName string
|
||||
consumerName string
|
||||
durableName string
|
||||
handler Handler
|
||||
concurrency int
|
||||
ackWait time.Duration
|
||||
maxDeliver int
|
||||
filterSubject string
|
||||
deliverPolicy nats.DeliverPolicy
|
||||
replayPolicy nats.ReplayPolicy
|
||||
log *slog.Logger
|
||||
storage nats.StorageType
|
||||
}
|
||||
|
||||
func WithStream(name string) Option {
|
||||
return func(c *config) { c.streamName = name }
|
||||
}
|
||||
|
||||
func WithConsumer(consumer, subject string) Option {
|
||||
return func(c *config) {
|
||||
c.consumerName = consumer
|
||||
c.filterSubject = subject
|
||||
}
|
||||
}
|
||||
|
||||
func WithDurable(name string) Option {
|
||||
return func(c *config) { c.durableName = name }
|
||||
}
|
||||
|
||||
func WithHandler(h Handler) Option {
|
||||
return func(c *config) { c.handler = h }
|
||||
}
|
||||
|
||||
func WithConcurrency(n int) Option {
|
||||
return func(c *config) { c.concurrency = n }
|
||||
}
|
||||
|
||||
func WithAckWait(d time.Duration) Option {
|
||||
return func(c *config) { c.ackWait = d }
|
||||
}
|
||||
|
||||
func WithMaxDeliver(n int) Option {
|
||||
return func(c *config) { c.maxDeliver = n }
|
||||
}
|
||||
|
||||
func WithLogger(l *slog.Logger) Option {
|
||||
return func(c *config) { c.log = l }
|
||||
}
|
||||
|
||||
func WithStorage(storage nats.StorageType) Option {
|
||||
return func(c *config) { c.storage = storage }
|
||||
}
|
||||
172
worker/worker.go
Normal file
172
worker/worker.go
Normal file
@@ -0,0 +1,172 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
type Worker struct {
|
||||
js nats.JetStreamContext
|
||||
sub *nats.Subscription
|
||||
handler Handler
|
||||
log *slog.Logger
|
||||
concurrency int
|
||||
}
|
||||
|
||||
func New(nc *nats.Conn, opts ...Option) (*Worker, error) {
|
||||
cfg := &config{
|
||||
concurrency: 1,
|
||||
ackWait: 30 * time.Second,
|
||||
maxDeliver: 1,
|
||||
deliverPolicy: nats.DeliverAllPolicy,
|
||||
replayPolicy: nats.ReplayInstantPolicy,
|
||||
log: slog.Default(),
|
||||
storage: nats.FileStorage,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(cfg)
|
||||
}
|
||||
|
||||
if cfg.handler == nil {
|
||||
return nil, ErrInvalidHandler
|
||||
}
|
||||
|
||||
if cfg.streamName == "" {
|
||||
return nil, ErrStreamNameRequired
|
||||
}
|
||||
|
||||
if cfg.filterSubject == "" {
|
||||
return nil, ErrSubjectRequired
|
||||
}
|
||||
|
||||
js, err := nc.JetStream()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
w := &Worker{
|
||||
js: js,
|
||||
log: cfg.log,
|
||||
handler: cfg.handler,
|
||||
concurrency: cfg.concurrency,
|
||||
}
|
||||
|
||||
_, err = js.StreamInfo(cfg.streamName)
|
||||
if err != nil {
|
||||
if errors.Is(err, nats.ErrStreamNotFound) {
|
||||
w.log.Info("creating stream", "name", cfg.streamName)
|
||||
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: cfg.streamName,
|
||||
Subjects: []string{cfg.filterSubject + ".>"},
|
||||
Storage: cfg.storage,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.consumerName == "" {
|
||||
cfg.consumerName = cfg.durableName
|
||||
}
|
||||
|
||||
consumerCfg := &nats.ConsumerConfig{
|
||||
Durable: cfg.durableName,
|
||||
AckPolicy: nats.AckExplicitPolicy,
|
||||
AckWait: cfg.ackWait,
|
||||
MaxDeliver: cfg.maxDeliver,
|
||||
FilterSubject: cfg.filterSubject,
|
||||
DeliverPolicy: cfg.deliverPolicy,
|
||||
ReplayPolicy: cfg.replayPolicy,
|
||||
}
|
||||
|
||||
_, err = js.AddConsumer(cfg.streamName, consumerCfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
w.sub, err = js.PullSubscribe(cfg.filterSubject, cfg.consumerName, nats.BindStream(cfg.streamName))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return w, nil
|
||||
}
|
||||
|
||||
func (w *Worker) Run() {
|
||||
w.log.Info("background worker started")
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
go func() {
|
||||
<-sigs
|
||||
w.log.Info("shutting down background worker")
|
||||
cancel()
|
||||
}()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < w.concurrency; i++ {
|
||||
wg.Add(1)
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
w.processMessages(ctx, id)
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
w.log.Info("background worker stopped")
|
||||
}
|
||||
|
||||
func (w *Worker) processMessages(ctx context.Context, workerID int) {
|
||||
batchSize := 10
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
msgs, err := w.sub.Fetch(
|
||||
batchSize,
|
||||
nats.Context(ctx),
|
||||
nats.MaxWait(5*time.Second),
|
||||
)
|
||||
if err != nil {
|
||||
if errors.Is(err, nats.ErrTimeout) {
|
||||
continue
|
||||
}
|
||||
w.log.Error("error fetching messages", "error", err)
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, msg := range msgs {
|
||||
if err := w.handler(ctx, msg); err != nil {
|
||||
w.log.Error("error processing message", "error", err)
|
||||
msg.Nak()
|
||||
} else {
|
||||
msg.Ack()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) Shutdown() {
|
||||
if w.sub != nil {
|
||||
w.sub.Drain()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user