From ca758cc708ab3ce77cd8e9f9801caca25b83c649 Mon Sep 17 00:00:00 2001 From: Derek Wright Date: Fri, 19 Dec 2025 09:59:00 -0500 Subject: [PATCH] added nats + worker modules --- auth/auth0/auth0.go | 29 ++---- auth/auth0/handlers_test.go | 9 +- auth/auth0/options.go | 17 ++++ go.mod | 17 +++- go.sum | 27 ++++++ nats/errors.go | 5 ++ nats/nats.go | 68 ++++++++++++++ nats/options.go | 74 ++++++++++++++++ worker/README.md | 94 ++++++++++++++++++++ worker/errors.go | 11 +++ worker/options.go | 67 ++++++++++++++ worker/worker.go | 172 ++++++++++++++++++++++++++++++++++++ 12 files changed, 559 insertions(+), 31 deletions(-) create mode 100644 auth/auth0/options.go create mode 100644 nats/errors.go create mode 100644 nats/nats.go create mode 100644 nats/options.go create mode 100644 worker/README.md create mode 100644 worker/errors.go create mode 100644 worker/options.go create mode 100644 worker/worker.go diff --git a/auth/auth0/auth0.go b/auth/auth0/auth0.go index b607e71..dc9d78c 100644 --- a/auth/auth0/auth0.go +++ b/auth/auth0/auth0.go @@ -4,6 +4,7 @@ import ( "context" "encoding/gob" "fmt" + "log/slog" "net/http" "net/url" @@ -16,45 +17,27 @@ func init() { gob.Register(SessionUser{}) } -type Logger interface { - Debug(msg string, args ...any) - Info(msg string, args ...any) - Error(msg string, args ...any) -} - type SessionManager interface { Get(ctx context.Context, key string) any Put(ctx context.Context, key string, value any) } type Config struct { - Logger Logger + Logger *slog.Logger Sessions SessionManager } -type Option func(deps *Config) - -func WithLogger(l Logger) Option { - return func(cfg *Config) { - cfg.Logger = l - } -} - -func WithSessions(s SessionManager) Option { - return func(cfg *Config) { - cfg.Sessions = s - } -} - type deps struct { auth *authenticator.Authenticator logoutBase *url.URL - log Logger + log *slog.Logger sessions SessionManager } func New(opts ...Option) (func(chi.Router), Middleware, error) { - cfg := Config{} + cfg := Config{ + Logger: slog.Default(), + } for _, opt := range opts { opt(&cfg) diff --git a/auth/auth0/handlers_test.go b/auth/auth0/handlers_test.go index a711635..36938b4 100644 --- a/auth/auth0/handlers_test.go +++ b/auth/auth0/handlers_test.go @@ -2,6 +2,7 @@ package auth0 import ( "context" + "log/slog" "net/http/httptest" "strings" "sync" @@ -12,12 +13,6 @@ import ( "git.citc.tech/go/web/auth/auth0/authenticator" ) -type mockLogger struct{} - -func (m *mockLogger) Debug(msg string, args ...any) {} -func (m *mockLogger) Info(msg string, args ...any) {} -func (m *mockLogger) Error(msg string, args ...any) {} - type mockSessionManager struct { store map[string]any mu sync.RWMutex @@ -65,7 +60,7 @@ func TestHandleLogic(t *testing.T) { } d := &deps{ - log: &mockLogger{}, + log: slog.Default(), sessions: mockSessions, auth: &authenticator.Authenticator{ Config: oauth2.Config{ diff --git a/auth/auth0/options.go b/auth/auth0/options.go new file mode 100644 index 0000000..9febe16 --- /dev/null +++ b/auth/auth0/options.go @@ -0,0 +1,17 @@ +package auth0 + +import "log/slog" + +type Option func(deps *Config) + +func WithLogger(l *slog.Logger) Option { + return func(cfg *Config) { + cfg.Logger = l + } +} + +func WithSessions(s SessionManager) Option { + return func(cfg *Config) { + cfg.Sessions = s + } +} diff --git a/go.mod b/go.mod index 6663764..2704142 100644 --- a/go.mod +++ b/go.mod @@ -9,4 +9,19 @@ require ( golang.org/x/oauth2 v0.34.0 ) -require github.com/go-jose/go-jose/v4 v4.1.3 // indirect +require ( + github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op // indirect + github.com/go-jose/go-jose/v4 v4.1.3 // indirect + github.com/google/go-tpm v0.9.7 // indirect + github.com/klauspost/compress v1.18.2 // indirect + github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 // indirect + github.com/nats-io/jwt/v2 v2.8.0 // indirect + github.com/nats-io/nats-server/v2 v2.12.3 // indirect + github.com/nats-io/nats.go v1.48.0 // indirect + github.com/nats-io/nkeys v0.4.12 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + go.uber.org/automaxprocs v1.6.0 // indirect + golang.org/x/crypto v0.46.0 // indirect + golang.org/x/sys v0.39.0 // indirect + golang.org/x/time v0.14.0 // indirect +) diff --git a/go.sum b/go.sum index 0a42825..98c8c94 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,37 @@ +github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op h1:Ucf+QxEKMbPogRO5guBNe5cgd9uZgfoJLOYs8WWhtjM= +github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E= github.com/coreos/go-oidc/v3 v3.17.0 h1:hWBGaQfbi0iVviX4ibC7bk8OKT5qNr4klBaCHVNvehc= github.com/coreos/go-oidc/v3 v3.17.0/go.mod h1:wqPbKFrVnE90vty060SB40FCJ8fTHTxSwyXJqZH+sI8= github.com/go-chi/chi/v5 v5.2.3 h1:WQIt9uxdsAbgIYgid+BpYc+liqQZGMHRaUwp0JUcvdE= github.com/go-chi/chi/v5 v5.2.3/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= github.com/go-jose/go-jose/v4 v4.1.3 h1:CVLmWDhDVRa6Mi/IgCgaopNosCaHz7zrMeF9MlZRkrs= github.com/go-jose/go-jose/v4 v4.1.3/go.mod h1:x4oUasVrzR7071A4TnHLGSPpNOm2a21K9Kf04k1rs08= +github.com/google/go-tpm v0.9.7 h1:u89J4tUUeDTlH8xxC3CTW7OHZjbjKoHdQ9W7gCUhtxA= +github.com/google/go-tpm v0.9.7/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= +github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 h1:KGuD/pM2JpL9FAYvBrnBBeENKZNh6eNtjqytV6TYjnk= +github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= +github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g= +github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= +github.com/nats-io/nats-server/v2 v2.12.3 h1:KRv+1n7lddMVgkJPQer+pt36TcO0ENxjilBmeWdjcHs= +github.com/nats-io/nats-server/v2 v2.12.3/go.mod h1:MQXjG9WjyXKz9koWzUc3jYUMKD8x3CLmTNy91IQQz3Y= +github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U= +github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nkeys v0.4.12 h1:nssm7JKOG9/x4J8II47VWCL1Ds29avyiQDRn0ckMvDc= +github.com/nats-io/nkeys v0.4.12/go.mod h1:MT59A1HYcjIcyQDJStTfaOY6vhy9XTUjOFo+SVsvpBg= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= +go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= +golang.org/x/crypto v0.46.0 h1:cKRW/pmt1pKAfetfu+RCEvjvZkA9RimPbh7bhFjGVBU= +golang.org/x/crypto v0.46.0/go.mod h1:Evb/oLKmMraqjZ2iQTwDwvCtJkczlDuTmdJXoZVzqU0= golang.org/x/oauth2 v0.34.0 h1:hqK/t4AKgbqWkdkcAeI8XLmbK+4m4G5YeQRrmiotGlw= golang.org/x/oauth2 v0.34.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= +golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= +golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= diff --git a/nats/errors.go b/nats/errors.go new file mode 100644 index 0000000..b3dcba7 --- /dev/null +++ b/nats/errors.go @@ -0,0 +1,5 @@ +package nats + +import "errors" + +var ErrNotReady = errors.New("nats server not ready after ready check timeout") diff --git a/nats/nats.go b/nats/nats.go new file mode 100644 index 0000000..ac73f35 --- /dev/null +++ b/nats/nats.go @@ -0,0 +1,68 @@ +package nats + +import ( + "time" + + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" +) + +func New(opts ...Option) (*nats.Conn, func(), error) { + cfg := &config{ + readyTimeout: 5 * time.Second, + } + + for _, opt := range opts { + opt(cfg) + } + + var nc *nats.Conn + var shutdown func() + + if cfg.externalURL != "" { + var err error + nc, err = nats.Connect(cfg.externalURL, cfg.connectOpts...) + if err != nil { + return nil, nil, err + } + shutdown = func() { nc.Close() } + return nc, shutdown, nil + } + + serverOpts := cfg.serverOpts + if serverOpts == nil { + serverOpts = &server.Options{ + Host: "127.0.0.1", + Port: -1, + NoLog: true, + } + } + + s, err := server.NewServer(serverOpts) + if err != nil { + return nil, nil, err + } + + go s.Start() + + if !s.ReadyForConnections(cfg.readyTimeout) { + s.Shutdown() + return nil, nil, ErrNotReady + } + + nc, err = nats.Connect( + s.ClientURL(), + append(cfg.connectOpts, nats.InProcessServer(s), nats.Name("embedded-client"))..., + ) + if err != nil { + s.Shutdown() + return nil, nil, err + } + + shutdown = func() { + nc.Close() + s.Shutdown() + } + + return nc, shutdown, nil +} diff --git a/nats/options.go b/nats/options.go new file mode 100644 index 0000000..f03ee9a --- /dev/null +++ b/nats/options.go @@ -0,0 +1,74 @@ +package nats + +import ( + "strings" + "time" + + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" +) + +type Option func(*config) + +type config struct { + serverOpts *server.Options + connectOpts []nats.Option + externalURL string + readyTimeout time.Duration + shutdownTimeout time.Duration +} + +func WithExternalURL(url string) Option { + return func(c *config) { c.externalURL = url } +} + +func WithServerOpts(opts *server.Options) Option { + return func(c *config) { c.serverOpts = opts } +} + +func WithConnectOpts(opts ...nats.Option) Option { + return func(c *config) { c.connectOpts = append(c.connectOpts, opts...) } +} + +func WithReadyTimeout(d time.Duration) Option { + return func(c *config) { c.readyTimeout = d } +} + +func WithCluster(name string, routes []string) Option { + return func(c *config) { + if c.serverOpts == nil { + c.serverOpts = &server.Options{} + } + c.serverOpts.Cluster = server.ClusterOpts{ + Name: name, + } + c.serverOpts.Routes = server.RoutesFromStr(strings.Join(routes, ",")) + } +} + +func WithClusterListen() Option { + return func(c *config) { + if c.serverOpts == nil { + c.serverOpts = &server.Options{} + } + c.serverOpts.Cluster.ListenStr = "0.0.0.0:6222" + } +} + +func WithJetStream(enabled bool, storeDir ...string) Option { + return func(c *config) { + if c.serverOpts == nil { + c.serverOpts = &server.Options{} + } + c.serverOpts.JetStream = enabled + if len(storeDir) > 0 { + c.serverOpts.StoreDir = storeDir[0] + } + } +} + +func WithClientName(name string) Option { + return func(c *config) { + c.connectOpts = append(c.connectOpts, nats.Name(name)) + } +} diff --git a/worker/README.md b/worker/README.md new file mode 100644 index 0000000..7146af7 --- /dev/null +++ b/worker/README.md @@ -0,0 +1,94 @@ +# go-worker + +A simple, reusable, production-ready background worker module for NATS JetStream in Go. + +This module allows you to easily run reliable, concurrent background workers that consume messages from JetStream streams using pull-based subscriptions. It integrates seamlessly with the `go-nats` embedded/clustered NATS module and follows the same clean, functional-options pattern as the rest of your toolkit. + +## Features + +- Pull-based JetStream consumer with configurable batch fetching +- Concurrent message processing (configurable workers) +- Automatic stream and consumer creation (idempotent) +- Configurable storage (Memory or File) for development vs production durability +- Graceful shutdown with context cancellation and subscription draining +- Explicit Ack/Nak handling with retry support +- Custom logger injection +- Works with embedded or external NATS servers + +## Installation + +```bash +go get git.citc.tech/go/web/worker +``` + +## Quick Start + +```go +package main + +import ( + "context" + "log/slog" + "os" + + "git.citc.tech/go/web/nats" + "git.citc.tech/go/web/worker" +) + +func main() { + logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) + + // Connect to NATS (embedded or external) + nc, natsShutdown, err := nats.New() + if err != nil { + logger.Error("failed to connect to NATS", "error", err) + os.Exit(1) + } + defer natsShutdown() + + // Create background worker + w, err := worker.New(nc, + worker.WithStream("ORDERS"), + worker.WithConsumer("order-processor", "orders.process"), + worker.WithHandler(func(ctx context.Context, msg *nats.Msg) error { + logger.Info("processing order", "data", string(msg.Data)) + // Your business logic here + return nil + }), + worker.WithConcurrency(5), + worker.WithStorage(nats.FileStorage), // or MemoryStorage for dev + worker.WithLogger(logger), + ) + if err != nil { + logger.Error("failed to create worker", "error", err) + os.Exit(1) + } + + // Run worker (blocks until shutdown signal) + go w.Run() + + // Your main application (e.g. HTTP server) runs here... + + // On exit + defer w.Shutdown() +} +``` + +## Options + +| Option | Description | Default | +|-------------------------|--------------------------------------------------|------------------------| +| `WithStream(name)` | JetStream stream name | required | +| `WithConsumer(name, subject)` | Consumer name and filter subject | required | +| `WithDurable(name)` | Durable consumer name (if different from consumer) | uses consumer name | +| `WithHandler(h)` | Message processing function | required | +| `WithConcurrency(n)` | Number of concurrent goroutines | 1 | +| `WithAckWait(d)` | Time to wait for acknowledgment | 30s | +| `WithMaxDeliver(n)` | Maximum delivery attempts | 1 | +| `WithLogger(l)` | Custom slog logger | slog.Default() | +| `WithStorage(t)` | Stream storage type (MemoryStorage or FileStorage) | FileStorage (safe) | + +## Storage Types + +- `nats.MemoryStorage` — Fast, in-memory only (lost on restart). Great for dev/testing. +- `nats.FileStorage` — Persistent on disk (survives restarts). Recommended for production. \ No newline at end of file diff --git a/worker/errors.go b/worker/errors.go new file mode 100644 index 0000000..4d51d48 --- /dev/null +++ b/worker/errors.go @@ -0,0 +1,11 @@ +package worker + +import "errors" + +var ( + ErrInvalidHandler = errors.New("handler is nil") + ErrStreamNotFound = errors.New("stream not found") + ErrConsumerNotFound = errors.New("consumer not found") + ErrStreamNameRequired = errors.New("stream name required") + ErrSubjectRequired = errors.New("subject required") +) diff --git a/worker/options.go b/worker/options.go new file mode 100644 index 0000000..cc602a1 --- /dev/null +++ b/worker/options.go @@ -0,0 +1,67 @@ +package worker + +import ( + "context" + "log/slog" + "time" + + "github.com/nats-io/nats.go" +) + +type Handler func(context.Context, *nats.Msg) error + +type Option func(*config) + +type config struct { + streamName string + consumerName string + durableName string + handler Handler + concurrency int + ackWait time.Duration + maxDeliver int + filterSubject string + deliverPolicy nats.DeliverPolicy + replayPolicy nats.ReplayPolicy + log *slog.Logger + storage nats.StorageType +} + +func WithStream(name string) Option { + return func(c *config) { c.streamName = name } +} + +func WithConsumer(consumer, subject string) Option { + return func(c *config) { + c.consumerName = consumer + c.filterSubject = subject + } +} + +func WithDurable(name string) Option { + return func(c *config) { c.durableName = name } +} + +func WithHandler(h Handler) Option { + return func(c *config) { c.handler = h } +} + +func WithConcurrency(n int) Option { + return func(c *config) { c.concurrency = n } +} + +func WithAckWait(d time.Duration) Option { + return func(c *config) { c.ackWait = d } +} + +func WithMaxDeliver(n int) Option { + return func(c *config) { c.maxDeliver = n } +} + +func WithLogger(l *slog.Logger) Option { + return func(c *config) { c.log = l } +} + +func WithStorage(storage nats.StorageType) Option { + return func(c *config) { c.storage = storage } +} diff --git a/worker/worker.go b/worker/worker.go new file mode 100644 index 0000000..6f98821 --- /dev/null +++ b/worker/worker.go @@ -0,0 +1,172 @@ +package worker + +import ( + "context" + "errors" + "log/slog" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/nats-io/nats.go" +) + +type Worker struct { + js nats.JetStreamContext + sub *nats.Subscription + handler Handler + log *slog.Logger + concurrency int +} + +func New(nc *nats.Conn, opts ...Option) (*Worker, error) { + cfg := &config{ + concurrency: 1, + ackWait: 30 * time.Second, + maxDeliver: 1, + deliverPolicy: nats.DeliverAllPolicy, + replayPolicy: nats.ReplayInstantPolicy, + log: slog.Default(), + storage: nats.FileStorage, + } + + for _, opt := range opts { + opt(cfg) + } + + if cfg.handler == nil { + return nil, ErrInvalidHandler + } + + if cfg.streamName == "" { + return nil, ErrStreamNameRequired + } + + if cfg.filterSubject == "" { + return nil, ErrSubjectRequired + } + + js, err := nc.JetStream() + if err != nil { + return nil, err + } + + w := &Worker{ + js: js, + log: cfg.log, + handler: cfg.handler, + concurrency: cfg.concurrency, + } + + _, err = js.StreamInfo(cfg.streamName) + if err != nil { + if errors.Is(err, nats.ErrStreamNotFound) { + w.log.Info("creating stream", "name", cfg.streamName) + + _, err = js.AddStream(&nats.StreamConfig{ + Name: cfg.streamName, + Subjects: []string{cfg.filterSubject + ".>"}, + Storage: cfg.storage, + }) + if err != nil { + return nil, err + } + } else { + return nil, err + } + } + + if cfg.consumerName == "" { + cfg.consumerName = cfg.durableName + } + + consumerCfg := &nats.ConsumerConfig{ + Durable: cfg.durableName, + AckPolicy: nats.AckExplicitPolicy, + AckWait: cfg.ackWait, + MaxDeliver: cfg.maxDeliver, + FilterSubject: cfg.filterSubject, + DeliverPolicy: cfg.deliverPolicy, + ReplayPolicy: cfg.replayPolicy, + } + + _, err = js.AddConsumer(cfg.streamName, consumerCfg) + if err != nil { + return nil, err + } + + w.sub, err = js.PullSubscribe(cfg.filterSubject, cfg.consumerName, nats.BindStream(cfg.streamName)) + if err != nil { + return nil, err + } + + return w, nil +} + +func (w *Worker) Run() { + w.log.Info("background worker started") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigs + w.log.Info("shutting down background worker") + cancel() + }() + + var wg sync.WaitGroup + for i := 0; i < w.concurrency; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + w.processMessages(ctx, id) + }(i) + } + + wg.Wait() + w.log.Info("background worker stopped") +} + +func (w *Worker) processMessages(ctx context.Context, workerID int) { + batchSize := 10 + for { + select { + case <-ctx.Done(): + return + default: + msgs, err := w.sub.Fetch( + batchSize, + nats.Context(ctx), + nats.MaxWait(5*time.Second), + ) + if err != nil { + if errors.Is(err, nats.ErrTimeout) { + continue + } + w.log.Error("error fetching messages", "error", err) + time.Sleep(time.Second) + continue + } + + for _, msg := range msgs { + if err := w.handler(ctx, msg); err != nil { + w.log.Error("error processing message", "error", err) + msg.Nak() + } else { + msg.Ack() + } + } + } + } +} + +func (w *Worker) Shutdown() { + if w.sub != nil { + w.sub.Drain() + } +}