Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions backend-go/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ internal/
│ │ ├── sqln/ # SQL nesting (GroupRows for LEFT JOIN flattening)
│ │ └── pglock/ # PostgreSQL advisory locks
│ └── redis/ # Redis client
├── libs/ # crypto, errutil, fn, logutil
├── libs/ # crypto, errutil, fn, logutil, cache, jitter, requestid
├── server/
│ ├── api/ # Endpoint implementations + DI wiring
│ │ ├── shared/ # Shared types (Error, ValidationError) + ErrorHandler
Expand All @@ -54,7 +54,8 @@ internal/
├── services/ # Shared business logic (auth, permission, kms, ...)
└── keystore/ # Redis key-value operations
tests/ # Integration tests (external test packages)
├── infra/ # Test infrastructure (testcontainers, helpers)
├── infra/ # Test infrastructure (testcontainers, HTTP client)
│ └── nodejs/ # Node.js backend seed facade (svc.For(t), domain builders)
├── platform/ # Platform service tests
│ ├── auth/ # Authentication handler tests
│ ├── externalkms/ # External KMS (AWS/GCP) tests
Expand All @@ -63,10 +64,18 @@ tests/ # Integration tests (external test packages)
│ ├── permission/ # Permission system tests
│ ├── projects/ # Projects handler tests
│ └── ratelimit/ # Rate limiting tests
└── secretmanager/
└── secrets/ # Secrets API tests (list, get, permissions)
└── secrets/
└── secrets/ # Secrets API tests (list, get, permissions, cache, v3, service token)
```

**Test seed data**: create projects/identities/secrets/etc. via the `tests/infra/nodejs`
facade — `api := stack.NodeJS().For(t)`, then domain builders like
`api.Secrets.Create(projectID, env, key, value).Comment("...").Do()`. Parameterized
endpoints are builders (required args in the constructor, optional setters, terminal
`Do()`) so new params don't break call sites. Each domain file co-locates its request/
response models with its endpoints. The `nodejs` package imports nothing from `infra`
(infra wires it via `nodejs.Start`/`Bootstrap`/`AttachDB`), keeping the dependency acyclic.

**Two tiers:**
- `server/api/` — DI wiring + endpoint implementations. Handlers 1:1 with endpoints.
- `services/` — Business logic, uses `pg.DB` directly.
Expand Down
11 changes: 6 additions & 5 deletions backend-go/cmd/infisical/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/infisical/api/internal/libs/logutil"
"github.com/infisical/api/internal/queue"
"github.com/infisical/api/internal/server"
"github.com/infisical/api/internal/server/api"
"github.com/infisical/api/internal/services"
)

func main() {
Expand Down Expand Up @@ -74,7 +74,7 @@ func run(cfg *config.Config, logger *slog.Logger) error {
defer errutil.DeferErr(ctx, redisClient.Close, "closing redis")

// Initialize KeyStore and Queue.
ks := keystore.NewKeyStore(redisClient)
ks := keystore.NewKeyStore(redisClient, db)
queueSvc := queue.NewService(ctx, logger, redisClient)
defer errutil.DeferErr(ctx, queueSvc.Close, "closing queue")

Expand Down Expand Up @@ -110,7 +110,7 @@ func run(cfg *config.Config, logger *slog.Logger) error {
hsmSvc = hsmService
}

services, cleanup, err := api.NewServices(ctx, &api.Infra{
infra := &services.Infra{
Logger: logger,
Config: cfg,
DB: db,
Expand All @@ -119,15 +119,16 @@ func run(cfg *config.Config, logger *slog.Logger) error {
License: licenseSvc,
KeyStore: ks,
Queue: queueSvc,
})
}
svc, cleanup, err := services.New(ctx, infra)
if err != nil {
logger.ErrorContext(ctx, "failed to initialize services", slog.Any("error", err))
return err
}
defer cleanup()

// Create server.
srv := server.NewServer(services, cfg, logger)
srv := server.NewServer(svc, cfg, logger)

// Create error channel for signal handling and server errors.
// Buffered to prevent blocking if multiple senders (signal, queue, HTTP) fire after first receive.
Expand Down
111 changes: 88 additions & 23 deletions backend-go/internal/keystore/keystore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ import (
"errors"
"time"

"github.com/jackc/pgx/v5"
"github.com/redis/go-redis/v9"

"github.com/infisical/api/internal/database/pg"
)

// KeyStore provides key-value operations backed by Redis.
// KeyStore provides key-value operations backed by Redis and PostgreSQL.
type KeyStore interface {
// Redis operations
SetItem(ctx context.Context, key string, value string) error
GetItem(ctx context.Context, key string) (string, error)
SetExpiry(ctx context.Context, key string, expiry time.Duration) (bool, error)
Expand All @@ -19,62 +23,123 @@ type KeyStore interface {
DeleteItems(ctx context.Context, keys []string) (int64, error)
IncrementBy(ctx context.Context, key string, value int64) (int64, error)

// IncrementByWithExpiry atomically increments a key and sets its expiry.
// If the key doesn't exist, it's created with value 0 before incrementing.
IncrementByWithExpiry(ctx context.Context, key string, value int64, expiry time.Duration) (int64, error)

// HashGet returns the value of a field in a hash (HGET).
// Returns empty string if key or field doesn't exist.
HashGet(ctx context.Context, key, field string) (string, error)

// HashSet sets a field in a hash (HSET).
HashSet(ctx context.Context, key, field, value string) error

// StreamAdd adds an entry to a Redis stream (XADD).
// Pass "*" as id to auto-generate the entry ID.
StreamAdd(ctx context.Context, stream string, id string, values map[string]string) (string, error)

// PostgreSQL key_value_store operations

// PgGetIntItem returns the integer value for a key from the PostgreSQL key_value_store table.
// Returns 0 if key doesn't exist or is expired.
PgGetIntItem(ctx context.Context, key string) (int64, error)
}

type redisKeyStore struct {
client redis.UniversalClient
type keyStore struct {
redis redis.UniversalClient
db pg.DB

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Here we define db, but this is actually Postgres right? Maybe we could have it as postgres instead of db, since the interface supports any db, so if in the future we need another db I believe it is more straightforward

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about another database. I used pg.DB because of two reasons.

  1. This is something used everywhere in the app. So most of them will get it as pg itself.
  2. I feel it would be strange to call is pg.Postgres

}

func NewKeyStore(client redis.UniversalClient) KeyStore {
return &redisKeyStore{client: client}
func NewKeyStore(redisClient redis.UniversalClient, db pg.DB) KeyStore {
return &keyStore{redis: redisClient, db: db}
}

func (k *redisKeyStore) SetItem(ctx context.Context, key, value string) error {
return k.client.Set(ctx, key, value, 0).Err()
func (k *keyStore) SetItem(ctx context.Context, key, value string) error {
return k.redis.Set(ctx, key, value, 0).Err()
}

func (k *redisKeyStore) GetItem(ctx context.Context, key string) (string, error) {
val, err := k.client.Get(ctx, key).Result()
func (k *keyStore) GetItem(ctx context.Context, key string) (string, error) {
val, err := k.redis.Get(ctx, key).Result()
if errors.Is(err, redis.Nil) {
return "", nil
}
return val, err
}

func (k *redisKeyStore) SetExpiry(ctx context.Context, key string, expiry time.Duration) (bool, error) {
return k.client.Expire(ctx, key, expiry).Result()
func (k *keyStore) SetExpiry(ctx context.Context, key string, expiry time.Duration) (bool, error) {
return k.redis.Expire(ctx, key, expiry).Result()
}

func (k *redisKeyStore) SetItemWithExpiry(ctx context.Context, key string, expiry time.Duration, value string) error {
return k.client.Set(ctx, key, value, expiry).Err()
func (k *keyStore) SetItemWithExpiry(ctx context.Context, key string, expiry time.Duration, value string) error {
return k.redis.Set(ctx, key, value, expiry).Err()
}

func (k *redisKeyStore) SetItemWithExpiryNX(ctx context.Context, key string, expiry time.Duration, value string) (bool, error) {
return k.client.SetNX(ctx, key, value, expiry).Result()
func (k *keyStore) SetItemWithExpiryNX(ctx context.Context, key string, expiry time.Duration, value string) (bool, error) {
return k.redis.SetNX(ctx, key, value, expiry).Result()
}

func (k *redisKeyStore) DeleteItem(ctx context.Context, key string) (int64, error) {
return k.client.Del(ctx, key).Result()
func (k *keyStore) DeleteItem(ctx context.Context, key string) (int64, error) {
return k.redis.Del(ctx, key).Result()
}

func (k *redisKeyStore) DeleteItems(ctx context.Context, keys []string) (int64, error) {
func (k *keyStore) DeleteItems(ctx context.Context, keys []string) (int64, error) {
if len(keys) == 0 {
return 0, nil
}
return k.client.Del(ctx, keys...).Result()
return k.redis.Del(ctx, keys...).Result()
}

func (k *keyStore) IncrementBy(ctx context.Context, key string, value int64) (int64, error) {
return k.redis.IncrBy(ctx, key, value).Result()
}

func (k *keyStore) IncrementByWithExpiry(ctx context.Context, key string, value int64, expiry time.Duration) (int64, error) {
pipe := k.redis.TxPipeline()
incrCmd := pipe.IncrBy(ctx, key, value)
pipe.Expire(ctx, key, expiry)
_, err := pipe.Exec(ctx)
if err != nil {
return 0, err
}
return incrCmd.Val(), nil
}

func (k *redisKeyStore) IncrementBy(ctx context.Context, key string, value int64) (int64, error) {
return k.client.IncrBy(ctx, key, value).Result()
func (k *keyStore) HashGet(ctx context.Context, key, field string) (string, error) {
val, err := k.redis.HGet(ctx, key, field).Result()
if errors.Is(err, redis.Nil) {
return "", nil
}
return val, err
}

func (k *keyStore) HashSet(ctx context.Context, key, field, value string) error {
return k.redis.HSet(ctx, key, field, value).Err()
}

func (k *redisKeyStore) StreamAdd(ctx context.Context, stream, id string, values map[string]string) (string, error) {
return k.client.XAdd(ctx, &redis.XAddArgs{
func (k *keyStore) StreamAdd(ctx context.Context, stream, id string, values map[string]string) (string, error) {
return k.redis.XAdd(ctx, &redis.XAddArgs{
Stream: stream,
ID: id,
Values: values,
}).Result()
}

func (k *keyStore) PgGetIntItem(ctx context.Context, key string) (int64, error) {
var integerValue *int64
err := k.db.Replica().QueryRow(ctx, `
SELECT "integerValue"
FROM key_value_store
WHERE key = @key
AND ("expiresAt" IS NULL OR "expiresAt" > NOW())
Comment on lines +129 to +133

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: one pattern that I see a lot on our knex queries is that if we receive a transaction we hit the primary database instead of the replica.

I saw the usage of this and it seems taht we are always hitting the replica (pgGetIntItem on keystore.ts) but the increment (pgIncrementBy.ts can receive a transaction, which will bump the wrong one). In my opinion, this should always query the main database, so we know that a cache was invalidated.

`, pgx.NamedArgs{"key": key}).Scan(&integerValue)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return 0, nil
}
return 0, err
}
if integerValue == nil {
return 0, nil
}
return *integerValue, nil
}
Loading
Loading