2 Commits

Author SHA1 Message Date
ca758cc708 added nats + worker modules 2025-12-19 09:59:00 -05:00
b457c74d49 fix auth stuff 2025-12-18 14:12:00 -05:00
13 changed files with 561 additions and 33 deletions

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"encoding/gob" "encoding/gob"
"fmt" "fmt"
"log/slog"
"net/http" "net/http"
"net/url" "net/url"
@@ -16,45 +17,27 @@ func init() {
gob.Register(SessionUser{}) gob.Register(SessionUser{})
} }
type Logger interface {
Debug(msg string, args ...any)
Info(msg string, args ...any)
Error(msg string, args ...any)
}
type SessionManager interface { type SessionManager interface {
Get(ctx context.Context, key string) any Get(ctx context.Context, key string) any
Put(ctx context.Context, key string, value any) Put(ctx context.Context, key string, value any)
} }
type Config struct { type Config struct {
Logger Logger Logger *slog.Logger
Sessions SessionManager Sessions SessionManager
} }
type Option func(deps *Config)
func WithLogger(l Logger) Option {
return func(cfg *Config) {
cfg.Logger = l
}
}
func WithSessions(s SessionManager) Option {
return func(cfg *Config) {
cfg.Sessions = s
}
}
type deps struct { type deps struct {
auth *authenticator.Authenticator auth *authenticator.Authenticator
logoutBase *url.URL logoutBase *url.URL
log Logger log *slog.Logger
sessions SessionManager sessions SessionManager
} }
func New(opts ...Option) (func(chi.Router), Middleware, error) { func New(opts ...Option) (func(chi.Router), Middleware, error) {
cfg := Config{} cfg := Config{
Logger: slog.Default(),
}
for _, opt := range opts { for _, opt := range opts {
opt(&cfg) opt(&cfg)

View File

@@ -2,6 +2,7 @@ package auth0
import ( import (
"context" "context"
"log/slog"
"net/http/httptest" "net/http/httptest"
"strings" "strings"
"sync" "sync"
@@ -12,12 +13,6 @@ import (
"git.citc.tech/go/web/auth/auth0/authenticator" "git.citc.tech/go/web/auth/auth0/authenticator"
) )
type mockLogger struct{}
func (m *mockLogger) Debug(msg string, args ...any) {}
func (m *mockLogger) Info(msg string, args ...any) {}
func (m *mockLogger) Error(msg string, args ...any) {}
type mockSessionManager struct { type mockSessionManager struct {
store map[string]any store map[string]any
mu sync.RWMutex mu sync.RWMutex
@@ -65,7 +60,7 @@ func TestHandleLogic(t *testing.T) {
} }
d := &deps{ d := &deps{
log: &mockLogger{}, log: slog.Default(),
sessions: mockSessions, sessions: mockSessions,
auth: &authenticator.Authenticator{ auth: &authenticator.Authenticator{
Config: oauth2.Config{ Config: oauth2.Config{

View File

@@ -33,6 +33,6 @@ func authenticatedMiddleware(deps *deps, next http.Handler) http.Handler {
}) })
} }
func CurrentUser(r *http.Request) any { func CurrentUser(r *http.Request) SessionUser {
return r.Context().Value(userContextKey{}) return r.Context().Value(userContextKey{}).(SessionUser)
} }

17
auth/auth0/options.go Normal file
View File

@@ -0,0 +1,17 @@
package auth0
import "log/slog"
type Option func(deps *Config)
func WithLogger(l *slog.Logger) Option {
return func(cfg *Config) {
cfg.Logger = l
}
}
func WithSessions(s SessionManager) Option {
return func(cfg *Config) {
cfg.Sessions = s
}
}

17
go.mod
View File

@@ -9,4 +9,19 @@ require (
golang.org/x/oauth2 v0.34.0 golang.org/x/oauth2 v0.34.0
) )
require github.com/go-jose/go-jose/v4 v4.1.3 // indirect require (
github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op // indirect
github.com/go-jose/go-jose/v4 v4.1.3 // indirect
github.com/google/go-tpm v0.9.7 // indirect
github.com/klauspost/compress v1.18.2 // indirect
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 // indirect
github.com/nats-io/jwt/v2 v2.8.0 // indirect
github.com/nats-io/nats-server/v2 v2.12.3 // indirect
github.com/nats-io/nats.go v1.48.0 // indirect
github.com/nats-io/nkeys v0.4.12 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
go.uber.org/automaxprocs v1.6.0 // indirect
golang.org/x/crypto v0.46.0 // indirect
golang.org/x/sys v0.39.0 // indirect
golang.org/x/time v0.14.0 // indirect
)

27
go.sum
View File

@@ -1,10 +1,37 @@
github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op h1:Ucf+QxEKMbPogRO5guBNe5cgd9uZgfoJLOYs8WWhtjM=
github.com/antithesishq/antithesis-sdk-go v0.5.0-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
github.com/coreos/go-oidc/v3 v3.17.0 h1:hWBGaQfbi0iVviX4ibC7bk8OKT5qNr4klBaCHVNvehc= github.com/coreos/go-oidc/v3 v3.17.0 h1:hWBGaQfbi0iVviX4ibC7bk8OKT5qNr4klBaCHVNvehc=
github.com/coreos/go-oidc/v3 v3.17.0/go.mod h1:wqPbKFrVnE90vty060SB40FCJ8fTHTxSwyXJqZH+sI8= github.com/coreos/go-oidc/v3 v3.17.0/go.mod h1:wqPbKFrVnE90vty060SB40FCJ8fTHTxSwyXJqZH+sI8=
github.com/go-chi/chi/v5 v5.2.3 h1:WQIt9uxdsAbgIYgid+BpYc+liqQZGMHRaUwp0JUcvdE= github.com/go-chi/chi/v5 v5.2.3 h1:WQIt9uxdsAbgIYgid+BpYc+liqQZGMHRaUwp0JUcvdE=
github.com/go-chi/chi/v5 v5.2.3/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= github.com/go-chi/chi/v5 v5.2.3/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops=
github.com/go-jose/go-jose/v4 v4.1.3 h1:CVLmWDhDVRa6Mi/IgCgaopNosCaHz7zrMeF9MlZRkrs= github.com/go-jose/go-jose/v4 v4.1.3 h1:CVLmWDhDVRa6Mi/IgCgaopNosCaHz7zrMeF9MlZRkrs=
github.com/go-jose/go-jose/v4 v4.1.3/go.mod h1:x4oUasVrzR7071A4TnHLGSPpNOm2a21K9Kf04k1rs08= github.com/go-jose/go-jose/v4 v4.1.3/go.mod h1:x4oUasVrzR7071A4TnHLGSPpNOm2a21K9Kf04k1rs08=
github.com/google/go-tpm v0.9.7 h1:u89J4tUUeDTlH8xxC3CTW7OHZjbjKoHdQ9W7gCUhtxA=
github.com/google/go-tpm v0.9.7/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk=
github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76 h1:KGuD/pM2JpL9FAYvBrnBBeENKZNh6eNtjqytV6TYjnk=
github.com/minio/highwayhash v1.0.4-0.20251030100505-070ab1a87a76/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.12.3 h1:KRv+1n7lddMVgkJPQer+pt36TcO0ENxjilBmeWdjcHs=
github.com/nats-io/nats-server/v2 v2.12.3/go.mod h1:MQXjG9WjyXKz9koWzUc3jYUMKD8x3CLmTNy91IQQz3Y=
github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U=
github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.12 h1:nssm7JKOG9/x4J8II47VWCL1Ds29avyiQDRn0ckMvDc=
github.com/nats-io/nkeys v0.4.12/go.mod h1:MT59A1HYcjIcyQDJStTfaOY6vhy9XTUjOFo+SVsvpBg=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
golang.org/x/crypto v0.46.0 h1:cKRW/pmt1pKAfetfu+RCEvjvZkA9RimPbh7bhFjGVBU=
golang.org/x/crypto v0.46.0/go.mod h1:Evb/oLKmMraqjZ2iQTwDwvCtJkczlDuTmdJXoZVzqU0=
golang.org/x/oauth2 v0.34.0 h1:hqK/t4AKgbqWkdkcAeI8XLmbK+4m4G5YeQRrmiotGlw= golang.org/x/oauth2 v0.34.0 h1:hqK/t4AKgbqWkdkcAeI8XLmbK+4m4G5YeQRrmiotGlw=
golang.org/x/oauth2 v0.34.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= golang.org/x/oauth2 v0.34.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk=
golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=

5
nats/errors.go Normal file
View File

@@ -0,0 +1,5 @@
package nats
import "errors"
var ErrNotReady = errors.New("nats server not ready after ready check timeout")

68
nats/nats.go Normal file
View File

@@ -0,0 +1,68 @@
package nats
import (
"time"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
)
func New(opts ...Option) (*nats.Conn, func(), error) {
cfg := &config{
readyTimeout: 5 * time.Second,
}
for _, opt := range opts {
opt(cfg)
}
var nc *nats.Conn
var shutdown func()
if cfg.externalURL != "" {
var err error
nc, err = nats.Connect(cfg.externalURL, cfg.connectOpts...)
if err != nil {
return nil, nil, err
}
shutdown = func() { nc.Close() }
return nc, shutdown, nil
}
serverOpts := cfg.serverOpts
if serverOpts == nil {
serverOpts = &server.Options{
Host: "127.0.0.1",
Port: -1,
NoLog: true,
}
}
s, err := server.NewServer(serverOpts)
if err != nil {
return nil, nil, err
}
go s.Start()
if !s.ReadyForConnections(cfg.readyTimeout) {
s.Shutdown()
return nil, nil, ErrNotReady
}
nc, err = nats.Connect(
s.ClientURL(),
append(cfg.connectOpts, nats.InProcessServer(s), nats.Name("embedded-client"))...,
)
if err != nil {
s.Shutdown()
return nil, nil, err
}
shutdown = func() {
nc.Close()
s.Shutdown()
}
return nc, shutdown, nil
}

74
nats/options.go Normal file
View File

@@ -0,0 +1,74 @@
package nats
import (
"strings"
"time"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
)
type Option func(*config)
type config struct {
serverOpts *server.Options
connectOpts []nats.Option
externalURL string
readyTimeout time.Duration
shutdownTimeout time.Duration
}
func WithExternalURL(url string) Option {
return func(c *config) { c.externalURL = url }
}
func WithServerOpts(opts *server.Options) Option {
return func(c *config) { c.serverOpts = opts }
}
func WithConnectOpts(opts ...nats.Option) Option {
return func(c *config) { c.connectOpts = append(c.connectOpts, opts...) }
}
func WithReadyTimeout(d time.Duration) Option {
return func(c *config) { c.readyTimeout = d }
}
func WithCluster(name string, routes []string) Option {
return func(c *config) {
if c.serverOpts == nil {
c.serverOpts = &server.Options{}
}
c.serverOpts.Cluster = server.ClusterOpts{
Name: name,
}
c.serverOpts.Routes = server.RoutesFromStr(strings.Join(routes, ","))
}
}
func WithClusterListen() Option {
return func(c *config) {
if c.serverOpts == nil {
c.serverOpts = &server.Options{}
}
c.serverOpts.Cluster.ListenStr = "0.0.0.0:6222"
}
}
func WithJetStream(enabled bool, storeDir ...string) Option {
return func(c *config) {
if c.serverOpts == nil {
c.serverOpts = &server.Options{}
}
c.serverOpts.JetStream = enabled
if len(storeDir) > 0 {
c.serverOpts.StoreDir = storeDir[0]
}
}
}
func WithClientName(name string) Option {
return func(c *config) {
c.connectOpts = append(c.connectOpts, nats.Name(name))
}
}

94
worker/README.md Normal file
View File

@@ -0,0 +1,94 @@
# go-worker
A simple, reusable, production-ready background worker module for NATS JetStream in Go.
This module allows you to easily run reliable, concurrent background workers that consume messages from JetStream streams using pull-based subscriptions. It integrates seamlessly with the `go-nats` embedded/clustered NATS module and follows the same clean, functional-options pattern as the rest of your toolkit.
## Features
- Pull-based JetStream consumer with configurable batch fetching
- Concurrent message processing (configurable workers)
- Automatic stream and consumer creation (idempotent)
- Configurable storage (Memory or File) for development vs production durability
- Graceful shutdown with context cancellation and subscription draining
- Explicit Ack/Nak handling with retry support
- Custom logger injection
- Works with embedded or external NATS servers
## Installation
```bash
go get git.citc.tech/go/web/worker
```
## Quick Start
```go
package main
import (
"context"
"log/slog"
"os"
"git.citc.tech/go/web/nats"
"git.citc.tech/go/web/worker"
)
func main() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
// Connect to NATS (embedded or external)
nc, natsShutdown, err := nats.New()
if err != nil {
logger.Error("failed to connect to NATS", "error", err)
os.Exit(1)
}
defer natsShutdown()
// Create background worker
w, err := worker.New(nc,
worker.WithStream("ORDERS"),
worker.WithConsumer("order-processor", "orders.process"),
worker.WithHandler(func(ctx context.Context, msg *nats.Msg) error {
logger.Info("processing order", "data", string(msg.Data))
// Your business logic here
return nil
}),
worker.WithConcurrency(5),
worker.WithStorage(nats.FileStorage), // or MemoryStorage for dev
worker.WithLogger(logger),
)
if err != nil {
logger.Error("failed to create worker", "error", err)
os.Exit(1)
}
// Run worker (blocks until shutdown signal)
go w.Run()
// Your main application (e.g. HTTP server) runs here...
// On exit
defer w.Shutdown()
}
```
## Options
| Option | Description | Default |
|-------------------------|--------------------------------------------------|------------------------|
| `WithStream(name)` | JetStream stream name | required |
| `WithConsumer(name, subject)` | Consumer name and filter subject | required |
| `WithDurable(name)` | Durable consumer name (if different from consumer) | uses consumer name |
| `WithHandler(h)` | Message processing function | required |
| `WithConcurrency(n)` | Number of concurrent goroutines | 1 |
| `WithAckWait(d)` | Time to wait for acknowledgment | 30s |
| `WithMaxDeliver(n)` | Maximum delivery attempts | 1 |
| `WithLogger(l)` | Custom slog logger | slog.Default() |
| `WithStorage(t)` | Stream storage type (MemoryStorage or FileStorage) | FileStorage (safe) |
## Storage Types
- `nats.MemoryStorage` — Fast, in-memory only (lost on restart). Great for dev/testing.
- `nats.FileStorage` — Persistent on disk (survives restarts). Recommended for production.

11
worker/errors.go Normal file
View File

@@ -0,0 +1,11 @@
package worker
import "errors"
var (
ErrInvalidHandler = errors.New("handler is nil")
ErrStreamNotFound = errors.New("stream not found")
ErrConsumerNotFound = errors.New("consumer not found")
ErrStreamNameRequired = errors.New("stream name required")
ErrSubjectRequired = errors.New("subject required")
)

67
worker/options.go Normal file
View File

@@ -0,0 +1,67 @@
package worker
import (
"context"
"log/slog"
"time"
"github.com/nats-io/nats.go"
)
type Handler func(context.Context, *nats.Msg) error
type Option func(*config)
type config struct {
streamName string
consumerName string
durableName string
handler Handler
concurrency int
ackWait time.Duration
maxDeliver int
filterSubject string
deliverPolicy nats.DeliverPolicy
replayPolicy nats.ReplayPolicy
log *slog.Logger
storage nats.StorageType
}
func WithStream(name string) Option {
return func(c *config) { c.streamName = name }
}
func WithConsumer(consumer, subject string) Option {
return func(c *config) {
c.consumerName = consumer
c.filterSubject = subject
}
}
func WithDurable(name string) Option {
return func(c *config) { c.durableName = name }
}
func WithHandler(h Handler) Option {
return func(c *config) { c.handler = h }
}
func WithConcurrency(n int) Option {
return func(c *config) { c.concurrency = n }
}
func WithAckWait(d time.Duration) Option {
return func(c *config) { c.ackWait = d }
}
func WithMaxDeliver(n int) Option {
return func(c *config) { c.maxDeliver = n }
}
func WithLogger(l *slog.Logger) Option {
return func(c *config) { c.log = l }
}
func WithStorage(storage nats.StorageType) Option {
return func(c *config) { c.storage = storage }
}

172
worker/worker.go Normal file
View File

@@ -0,0 +1,172 @@
package worker
import (
"context"
"errors"
"log/slog"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/nats-io/nats.go"
)
type Worker struct {
js nats.JetStreamContext
sub *nats.Subscription
handler Handler
log *slog.Logger
concurrency int
}
func New(nc *nats.Conn, opts ...Option) (*Worker, error) {
cfg := &config{
concurrency: 1,
ackWait: 30 * time.Second,
maxDeliver: 1,
deliverPolicy: nats.DeliverAllPolicy,
replayPolicy: nats.ReplayInstantPolicy,
log: slog.Default(),
storage: nats.FileStorage,
}
for _, opt := range opts {
opt(cfg)
}
if cfg.handler == nil {
return nil, ErrInvalidHandler
}
if cfg.streamName == "" {
return nil, ErrStreamNameRequired
}
if cfg.filterSubject == "" {
return nil, ErrSubjectRequired
}
js, err := nc.JetStream()
if err != nil {
return nil, err
}
w := &Worker{
js: js,
log: cfg.log,
handler: cfg.handler,
concurrency: cfg.concurrency,
}
_, err = js.StreamInfo(cfg.streamName)
if err != nil {
if errors.Is(err, nats.ErrStreamNotFound) {
w.log.Info("creating stream", "name", cfg.streamName)
_, err = js.AddStream(&nats.StreamConfig{
Name: cfg.streamName,
Subjects: []string{cfg.filterSubject + ".>"},
Storage: cfg.storage,
})
if err != nil {
return nil, err
}
} else {
return nil, err
}
}
if cfg.consumerName == "" {
cfg.consumerName = cfg.durableName
}
consumerCfg := &nats.ConsumerConfig{
Durable: cfg.durableName,
AckPolicy: nats.AckExplicitPolicy,
AckWait: cfg.ackWait,
MaxDeliver: cfg.maxDeliver,
FilterSubject: cfg.filterSubject,
DeliverPolicy: cfg.deliverPolicy,
ReplayPolicy: cfg.replayPolicy,
}
_, err = js.AddConsumer(cfg.streamName, consumerCfg)
if err != nil {
return nil, err
}
w.sub, err = js.PullSubscribe(cfg.filterSubject, cfg.consumerName, nats.BindStream(cfg.streamName))
if err != nil {
return nil, err
}
return w, nil
}
func (w *Worker) Run() {
w.log.Info("background worker started")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
w.log.Info("shutting down background worker")
cancel()
}()
var wg sync.WaitGroup
for i := 0; i < w.concurrency; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
w.processMessages(ctx, id)
}(i)
}
wg.Wait()
w.log.Info("background worker stopped")
}
func (w *Worker) processMessages(ctx context.Context, workerID int) {
batchSize := 10
for {
select {
case <-ctx.Done():
return
default:
msgs, err := w.sub.Fetch(
batchSize,
nats.Context(ctx),
nats.MaxWait(5*time.Second),
)
if err != nil {
if errors.Is(err, nats.ErrTimeout) {
continue
}
w.log.Error("error fetching messages", "error", err)
time.Sleep(time.Second)
continue
}
for _, msg := range msgs {
if err := w.handler(ctx, msg); err != nil {
w.log.Error("error processing message", "error", err)
msg.Nak()
} else {
msg.Ack()
}
}
}
}
}
func (w *Worker) Shutdown() {
if w.sub != nil {
w.sub.Drain()
}
}