feat: Add initial implementation of Vulnerability Resolver Jobs
Some checks failed
Docs CI / lint-and-preview (push) Has been cancelled

- Created project for StellaOps.Scanner.Analyzers.Native.Tests with necessary dependencies.
- Documented roles and guidelines in AGENTS.md for Scheduler module.
- Implemented IResolverJobService interface and InMemoryResolverJobService for handling resolver jobs.
- Added ResolverBacklogNotifier and ResolverBacklogService for monitoring job metrics.
- Developed API endpoints for managing resolver jobs and retrieving metrics.
- Defined models for resolver job requests and responses.
- Integrated dependency injection for resolver job services.
- Implemented ImpactIndexSnapshot for persisting impact index data.
- Introduced SignalsScoringOptions for configurable scoring weights in reachability scoring.
- Added unit tests for ReachabilityScoringService and RuntimeFactsIngestionService.
- Created dotnet-filter.sh script to handle command-line arguments for dotnet.
- Established nuget-prime project for managing package downloads.
This commit is contained in:
master
2025-11-18 07:52:15 +02:00
parent e69b57d467
commit 8355e2ff75
299 changed files with 13293 additions and 2444 deletions

View File

@@ -0,0 +1,9 @@
# Worker SDK (Go) — Task Tracker
| Task ID | Status | Notes | Updated (UTC) |
| --- | --- | --- | --- |
| WORKER-GO-32-001 | DONE | Initial Go SDK scaffold with config binding, auth headers, claim/ack client, smoke sample, and unit tests. | 2025-11-17 |
| WORKER-GO-32-002 | DONE | Heartbeat/progress helpers, logging hooks, metrics, and jittered retries. | 2025-11-17 |
| WORKER-GO-33-001 | DONE | Artifact publish helpers, checksum hashing, metadata payload, idempotency guard. | 2025-11-17 |
| WORKER-GO-33-002 | DONE | Error classification/backoff helpers and structured failure reporting. | 2025-11-17 |
| WORKER-GO-34-001 | DONE | Backfill range execution helpers, watermark handshake, artifact dedupe verification. | 2025-11-17 |

View File

@@ -0,0 +1,45 @@
package main
import (
"context"
"log"
"time"
"git.stella-ops.org/stellaops/orchestrator/worker-sdk-go/pkg/workersdk"
)
func main() {
client, err := workersdk.New(workersdk.Config{
BaseURL: "http://localhost:8080",
APIKey: "dev-token",
TenantID: "local-tenant",
ProjectID: "demo-project",
})
if err != nil {
log.Fatalf("configure client: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
claim, err := client.Claim(ctx, workersdk.ClaimJobRequest{WorkerID: "demo-worker", Capabilities: []string{"pack-run"}})
if err != nil {
log.Fatalf("claim job: %v", err)
}
if claim == nil {
log.Println("no work available")
return
}
// ... perform work using claim.Payload ...
// heartbeat and progress
_ = client.Heartbeat(ctx, claim.JobID, claim.LeaseID)
_ = client.Progress(ctx, claim.JobID, claim.LeaseID, 50, "halfway")
if err := client.Ack(ctx, workersdk.AckJobRequest{JobID: claim.JobID, LeaseID: claim.LeaseID, Status: "succeeded"}); err != nil {
log.Fatalf("ack job: %v", err)
}
log.Printf("acknowledged job %s", claim.JobID)
}

View File

@@ -0,0 +1,3 @@
module git.stella-ops.org/stellaops/orchestrator/worker-sdk-go
go 1.21

View File

@@ -0,0 +1,31 @@
package transport
import (
"context"
"net/http"
)
// RoundTripper abstracts HTTP transport so we can stub in tests without
// depending on the default client.
type RoundTripper interface {
RoundTrip(*http.Request) (*http.Response, error)
}
// Client wraps an http.Client-like implementation.
type Client interface {
Do(req *http.Request) (*http.Response, error)
}
// DefaultClient returns a minimal http.Client with sane defaults.
func DefaultClient(rt RoundTripper) *http.Client {
if rt == nil {
return &http.Client{}
}
return &http.Client{Transport: rt}
}
// Do wraps an HTTP call using the provided Client.
func Do(ctx context.Context, c Client, req *http.Request) (*http.Response, error) {
req = req.WithContext(ctx)
return c.Do(req)
}

View File

@@ -0,0 +1,66 @@
package workersdk
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
)
// StorageClient is a minimal interface for artifact storage backends.
type StorageClient interface {
PutObject(ctx context.Context, key string, body io.Reader, metadata map[string]string) error
}
// ArtifactPublishRequest describes an artifact to upload.
type ArtifactPublishRequest struct {
JobID string
LeaseID string
ObjectKey string
Content io.Reader
ContentLength int64
ContentType string
ArtifactType string
IdempotencyKey string
Storage StorageClient
}
// ArtifactPublishResponse returns checksum metadata.
type ArtifactPublishResponse struct {
SHA256 string
Size int64
}
// PublishArtifact uploads artifact content with checksum metadata and idempotency guard.
func (c *Client) PublishArtifact(ctx context.Context, req ArtifactPublishRequest) (*ArtifactPublishResponse, error) {
if req.JobID == "" || req.LeaseID == "" {
return nil, fmt.Errorf("JobID and LeaseID are required")
}
if req.ObjectKey == "" {
return nil, fmt.Errorf("ObjectKey is required")
}
if req.Storage == nil {
return nil, fmt.Errorf("Storage client is required")
}
// Compute SHA256 while streaming.
hasher := sha256.New()
tee := io.TeeReader(req.Content, hasher)
// Wrap to enforce known length? length optional; storage client may use metadata.
metadata := map[string]string{
"x-stellaops-job-id": req.JobID,
"x-stellaops-lease": req.LeaseID,
"x-stellaops-type": req.ArtifactType,
"x-stellaops-ct": req.ContentType,
}
if req.IdempotencyKey != "" {
metadata["x-idempotency-key"] = req.IdempotencyKey
}
if err := req.Storage.PutObject(ctx, req.ObjectKey, tee, metadata); err != nil {
return nil, err
}
sum := hex.EncodeToString(hasher.Sum(nil))
return &ArtifactPublishResponse{SHA256: sum, Size: req.ContentLength}, nil
}

View File

@@ -0,0 +1,61 @@
package workersdk
import (
"bytes"
"context"
"io"
"testing"
)
type memStorage struct {
key string
data []byte
metadata map[string]string
}
func (m *memStorage) PutObject(ctx context.Context, key string, body io.Reader, metadata map[string]string) error {
b, err := io.ReadAll(body)
if err != nil {
return err
}
m.key = key
m.data = b
m.metadata = metadata
return nil
}
func TestPublishArtifact(t *testing.T) {
store := &memStorage{}
client, err := New(Config{BaseURL: "https://example"})
if err != nil {
t.Fatalf("new client: %v", err)
}
content := []byte("hello")
resp, err := client.PublishArtifact(context.Background(), ArtifactPublishRequest{
JobID: "job1",
LeaseID: "lease1",
ObjectKey: "artifacts/job1/output.txt",
Content: bytes.NewReader(content),
ContentLength: int64(len(content)),
ContentType: "text/plain",
ArtifactType: "log",
IdempotencyKey: "idem-1",
Storage: store,
})
if err != nil {
t.Fatalf("publish: %v", err)
}
if resp.SHA256 == "" || resp.Size != 5 {
t.Fatalf("unexpected resp: %+v", resp)
}
if store.key != "artifacts/job1/output.txt" {
t.Fatalf("key mismatch: %s", store.key)
}
if store.metadata["x-idempotency-key"] != "idem-1" {
t.Fatalf("idempotency missing")
}
if store.metadata["x-stellaops-job-id"] != "job1" {
t.Fatalf("job metadata missing")
}
}

View File

@@ -0,0 +1,86 @@
package workersdk
import (
"context"
"fmt"
"time"
)
// Range represents an inclusive backfill window.
type Range struct {
Start time.Time
End time.Time
}
// Validate ensures start <= end.
func (r Range) Validate() error {
if r.End.Before(r.Start) {
return fmt.Errorf("range end before start")
}
return nil
}
// WatermarkHandshake ensures the worker's view of the watermark matches orchestrator-provided value.
type WatermarkHandshake struct {
Expected string
Current string
}
func (w WatermarkHandshake) Validate() error {
if w.Expected == "" {
return fmt.Errorf("expected watermark required")
}
if w.Expected != w.Current {
return fmt.Errorf("watermark mismatch")
}
return nil
}
// Deduper tracks processed artifact digests to prevent duplicate publication.
type Deduper struct {
seen map[string]struct{}
}
// NewDeduper creates a deduper.
func NewDeduper() *Deduper {
return &Deduper{seen: make(map[string]struct{})}
}
// Seen returns true if digest already processed; marks new digests.
func (d *Deduper) Seen(digest string) bool {
if digest == "" {
return false
}
if _, ok := d.seen[digest]; ok {
return true
}
d.seen[digest] = struct{}{}
return false
}
// ExecuteRange iterates [start,end] by step days, invoking fn for each day.
func ExecuteRange(ctx context.Context, r Range, step time.Duration, fn func(context.Context, time.Time) error) error {
if err := r.Validate(); err != nil {
return err
}
if step <= 0 {
return fmt.Errorf("step must be positive")
}
for ts := r.Start; !ts.After(r.End); ts = ts.Add(step) {
if err := fn(ctx, ts); err != nil {
return err
}
}
return nil
}
// VerifyAndPublishArtifact wraps PublishArtifact with dedupe and watermark guard.
func (c *Client) VerifyAndPublishArtifact(ctx context.Context, wm WatermarkHandshake, dedupe *Deduper, req ArtifactPublishRequest) (*ArtifactPublishResponse, error) {
if err := wm.Validate(); err != nil {
return nil, err
}
if dedupe != nil && dedupe.Seen(req.IdempotencyKey) {
return nil, fmt.Errorf("duplicate artifact idempotency key")
}
return c.PublishArtifact(ctx, req)
}

View File

@@ -0,0 +1,85 @@
package workersdk
import (
"bytes"
"context"
"io"
"testing"
"time"
)
type stubStorage struct{}
func (stubStorage) PutObject(ctx context.Context, key string, body io.Reader, metadata map[string]string) error {
return nil
}
func TestRangeValidation(t *testing.T) {
r := Range{Start: time.Now(), End: time.Now().Add(-time.Hour)}
if err := r.Validate(); err == nil {
t.Fatalf("expected error for invalid range")
}
}
func TestExecuteRange(t *testing.T) {
start := time.Date(2025, 11, 15, 0, 0, 0, 0, time.UTC)
end := start.Add(48 * time.Hour)
r := Range{Start: start, End: end}
calls := 0
err := ExecuteRange(context.Background(), r, 24*time.Hour, func(ctx context.Context, ts time.Time) error {
calls++
return nil
})
if err != nil {
t.Fatalf("execute range: %v", err)
}
if calls != 3 {
t.Fatalf("expected 3 calls, got %d", calls)
}
}
func TestWatermarkMismatch(t *testing.T) {
wm := WatermarkHandshake{Expected: "abc", Current: "def"}
if err := wm.Validate(); err == nil {
t.Fatal("expected mismatch error")
}
}
func TestDeduper(t *testing.T) {
d := NewDeduper()
if d.Seen("sha") {
t.Fatal("should be new")
}
if !d.Seen("sha") {
t.Fatal("should detect duplicate")
}
}
func TestVerifyAndPublishArtifactDuplicate(t *testing.T) {
d := NewDeduper()
c, _ := New(Config{BaseURL: "https://x"})
d.Seen("idem1")
_, err := c.VerifyAndPublishArtifact(
context.Background(),
WatermarkHandshake{Expected: "w", Current: "w"},
d,
ArtifactPublishRequest{IdempotencyKey: "idem1", Storage: stubStorage{}, JobID: "j", LeaseID: "l", ObjectKey: "k", Content: bytes.NewReader([]byte{}), ArtifactType: "log"},
)
if err == nil {
t.Fatal("expected duplicate error")
}
}
func TestVerifyAndPublishArtifactWatermark(t *testing.T) {
d := NewDeduper()
c, _ := New(Config{BaseURL: "https://x"})
_, err := c.VerifyAndPublishArtifact(
context.Background(),
WatermarkHandshake{Expected: "w1", Current: "w2"},
d,
ArtifactPublishRequest{IdempotencyKey: "idem2", Storage: stubStorage{}, JobID: "j", LeaseID: "l", ObjectKey: "k", Content: bytes.NewReader([]byte{}), ArtifactType: "log"},
)
if err == nil {
t.Fatal("expected watermark error")
}
}

View File

@@ -0,0 +1,243 @@
package workersdk
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"path"
"time"
"git.stella-ops.org/stellaops/orchestrator/worker-sdk-go/internal/transport"
)
// Client provides job claim/acknowledge operations.
type Client struct {
baseURL *url.URL
apiKey string
tenantID string
projectID string
userAgent string
http transport.Client
logger Logger
metrics MetricsSink
}
// New creates a configured Client.
func New(cfg Config) (*Client, error) {
if err := cfg.validate(); err != nil {
return nil, err
}
parsed, err := url.Parse(cfg.BaseURL)
if err != nil {
return nil, fmt.Errorf("invalid BaseURL: %w", err)
}
ua := cfg.UserAgent
if ua == "" {
ua = "stellaops-worker-sdk-go/0.1"
}
return &Client{
baseURL: parsed,
apiKey: cfg.APIKey,
tenantID: cfg.TenantID,
projectID: cfg.ProjectID,
userAgent: ua,
http: cfg.httpClient(),
logger: cfg.logger(),
metrics: cfg.metrics(),
}, nil
}
// ClaimJobRequest represents a worker's desire to lease a job.
type ClaimJobRequest struct {
WorkerID string `json:"worker_id"`
Capabilities []string `json:"capabilities,omitempty"`
}
// ClaimJobResponse returns the leased job payload.
type ClaimJobResponse struct {
JobID string `json:"job_id"`
LeaseID string `json:"lease_id"`
ExpiresAt time.Time `json:"expires_at"`
JobType string `json:"job_type"`
Payload json.RawMessage `json:"payload"`
RetryAfter int `json:"retry_after_seconds,omitempty"`
NotBefore *time.Time `json:"not_before,omitempty"`
TraceID string `json:"trace_id,omitempty"`
}
// AckJobRequest represents completion of a job.
type AckJobRequest struct {
JobID string `json:"job_id"`
LeaseID string `json:"lease_id"`
Status string `json:"status"`
Message string `json:"message,omitempty"`
Rotating string `json:"rotating_token,omitempty"`
}
// Claim requests the next available job for the worker.
func (c *Client) Claim(ctx context.Context, req ClaimJobRequest) (*ClaimJobResponse, error) {
if req.WorkerID == "" {
return nil, fmt.Errorf("WorkerID is required")
}
endpoint := c.resolve("/api/jobs/lease")
resp, err := c.doClaim(ctx, endpoint, req)
if err == nil {
c.metrics.IncClaimed()
}
c.logger.Info(ctx, "claim", map[string]any{"worker_id": req.WorkerID, "err": err})
return resp, err
}
// Ack acknowledges job completion or failure.
func (c *Client) Ack(ctx context.Context, req AckJobRequest) error {
if req.JobID == "" || req.LeaseID == "" || req.Status == "" {
return fmt.Errorf("JobID, LeaseID, and Status are required")
}
endpoint := c.resolve(path.Join("/api/jobs", req.JobID, "ack"))
payload, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("marshal ack request: %w", err)
}
httpReq, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewReader(payload))
if err != nil {
return err
}
c.applyHeaders(httpReq)
httpReq.Header.Set("Content-Type", "application/json")
resp, err := transport.Do(ctx, c.http, httpReq)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
b, _ := io.ReadAll(io.LimitReader(resp.Body, 8<<10))
return fmt.Errorf("ack failed: %s (%s)", resp.Status, string(b))
}
c.metrics.IncAck(req.Status)
return nil
}
// Heartbeat reports liveness for a job lease.
func (c *Client) Heartbeat(ctx context.Context, jobID, leaseID string) error {
if jobID == "" || leaseID == "" {
return fmt.Errorf("JobID and LeaseID are required")
}
endpoint := c.resolve(path.Join("/api/jobs", jobID, "heartbeat"))
payload, _ := json.Marshal(map[string]string{"lease_id": leaseID})
httpReq, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewReader(payload))
if err != nil {
return err
}
c.applyHeaders(httpReq)
httpReq.Header.Set("Content-Type", "application/json")
start := time.Now()
resp, err := transport.Do(ctx, c.http, httpReq)
if err != nil {
c.metrics.IncHeartbeatFailures()
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
b, _ := io.ReadAll(io.LimitReader(resp.Body, 8<<10))
c.metrics.IncHeartbeatFailures()
return fmt.Errorf("heartbeat failed: %s (%s)", resp.Status, string(b))
}
c.metrics.ObserveHeartbeatLatency(time.Since(start).Seconds())
return nil
}
// Progress reports worker progress (0-100) with optional message.
func (c *Client) Progress(ctx context.Context, jobID, leaseID string, pct int, message string) error {
if pct < 0 || pct > 100 {
return fmt.Errorf("pct must be 0-100")
}
payload, _ := json.Marshal(map[string]any{
"lease_id": leaseID,
"progress": pct,
"message": message,
})
endpoint := c.resolve(path.Join("/api/jobs", jobID, "progress"))
httpReq, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewReader(payload))
if err != nil {
return err
}
c.applyHeaders(httpReq)
httpReq.Header.Set("Content-Type", "application/json")
resp, err := transport.Do(ctx, c.http, httpReq)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
b, _ := io.ReadAll(io.LimitReader(resp.Body, 8<<10))
return fmt.Errorf("progress failed: %s (%s)", resp.Status, string(b))
}
return nil
}
func (c *Client) doClaim(ctx context.Context, endpoint string, req ClaimJobRequest) (*ClaimJobResponse, error) {
payload, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal claim request: %w", err)
}
httpReq, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewReader(payload))
if err != nil {
return nil, err
}
c.applyHeaders(httpReq)
httpReq.Header.Set("Content-Type", "application/json")
resp, err := transport.Do(ctx, c.http, httpReq)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNoContent {
return nil, nil // no work available
}
if resp.StatusCode >= 300 {
b, _ := io.ReadAll(io.LimitReader(resp.Body, 8<<10))
return nil, fmt.Errorf("claim failed: %s (%s)", resp.Status, string(b))
}
var out ClaimJobResponse
decoder := json.NewDecoder(resp.Body)
decoder.DisallowUnknownFields()
if err := decoder.Decode(&out); err != nil {
return nil, fmt.Errorf("decode claim response: %w", err)
}
return &out, nil
}
func (c *Client) applyHeaders(r *http.Request) {
if c.apiKey != "" {
r.Header.Set("Authorization", "Bearer "+c.apiKey)
}
if c.tenantID != "" {
r.Header.Set("X-StellaOps-Tenant", c.tenantID)
}
if c.projectID != "" {
r.Header.Set("X-StellaOps-Project", c.projectID)
}
r.Header.Set("Accept", "application/json")
if c.userAgent != "" {
r.Header.Set("User-Agent", c.userAgent)
}
}
func (c *Client) resolve(p string) string {
clone := *c.baseURL
clone.Path = path.Join(clone.Path, p)
return clone.String()
}

View File

@@ -0,0 +1,136 @@
package workersdk
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
)
type claimRecorded struct {
Method string
Path string
Auth string
Tenant string
Project string
Body ClaimJobRequest
}
type ackRecorded struct {
Method string
Path string
Body AckJobRequest
}
func TestClaimAndAck(t *testing.T) {
var claimRec claimRecorded
var ackRec ackRecorded
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/jobs/lease":
claimRec.Method = r.Method
claimRec.Path = r.URL.Path
claimRec.Auth = r.Header.Get("Authorization")
claimRec.Tenant = r.Header.Get("X-StellaOps-Tenant")
claimRec.Project = r.Header.Get("X-StellaOps-Project")
if err := json.NewDecoder(r.Body).Decode(&claimRec.Body); err != nil {
t.Fatalf("decode claim: %v", err)
}
resp := ClaimJobResponse{
JobID: "123",
LeaseID: "lease-1",
ExpiresAt: time.Date(2025, 11, 17, 0, 0, 0, 0, time.UTC),
JobType: "demo",
Payload: json.RawMessage(`{"key":"value"}`),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
case "/api/jobs/123/heartbeat":
w.WriteHeader(http.StatusAccepted)
case "/api/jobs/123/progress":
w.WriteHeader(http.StatusAccepted)
default:
ackRec.Method = r.Method
ackRec.Path = r.URL.Path
if err := json.NewDecoder(r.Body).Decode(&ackRec.Body); err != nil {
t.Fatalf("decode ack: %v", err)
}
w.WriteHeader(http.StatusAccepted)
}
}))
defer srv.Close()
client, err := New(Config{
BaseURL: srv.URL,
APIKey: "token-1",
TenantID: "tenant-a",
ProjectID: "project-1",
})
if err != nil {
t.Fatalf("new client: %v", err)
}
ctx := context.Background()
claimResp, err := client.Claim(ctx, ClaimJobRequest{WorkerID: "worker-7", Capabilities: []string{"scan"}})
if err != nil {
t.Fatalf("claim: %v", err)
}
if claimRec.Method != http.MethodPost || claimRec.Path != "/api/jobs/lease" {
t.Fatalf("unexpected claim method/path: %s %s", claimRec.Method, claimRec.Path)
}
if claimRec.Auth != "Bearer token-1" {
t.Fatalf("auth header mismatch: %s", claimRec.Auth)
}
if claimRec.Tenant != "tenant-a" || claimRec.Project != "project-1" {
t.Fatalf("tenant/project headers missing: %s %s", claimRec.Tenant, claimRec.Project)
}
if claimRec.Body.WorkerID != "worker-7" {
t.Fatalf("worker id missing")
}
if claimResp == nil || claimResp.JobID != "123" || claimResp.LeaseID != "lease-1" {
t.Fatalf("claim response mismatch: %+v", claimResp)
}
err = client.Ack(ctx, AckJobRequest{JobID: claimResp.JobID, LeaseID: claimResp.LeaseID, Status: "succeeded"})
if err != nil {
t.Fatalf("ack error: %v", err)
}
if err := client.Heartbeat(ctx, claimResp.JobID, claimResp.LeaseID); err != nil {
t.Fatalf("heartbeat error: %v", err)
}
if err := client.Progress(ctx, claimResp.JobID, claimResp.LeaseID, 50, "halfway"); err != nil {
t.Fatalf("progress error: %v", err)
}
if ackRec.Method != http.MethodPost {
t.Fatalf("ack method mismatch: %s", ackRec.Method)
}
if ackRec.Path != "/api/jobs/123/ack" {
t.Fatalf("ack path mismatch: %s", ackRec.Path)
}
if ackRec.Body.Status != "succeeded" || ackRec.Body.JobID != "123" {
t.Fatalf("ack body mismatch: %+v", ackRec.Body)
}
}
func TestClaimMissingWorker(t *testing.T) {
client, err := New(Config{BaseURL: "https://example.invalid"})
if err != nil {
t.Fatalf("new client: %v", err)
}
if _, err := client.Claim(context.Background(), ClaimJobRequest{}); err == nil {
t.Fatal("expected error for missing worker id")
}
}
func TestConfigValidation(t *testing.T) {
if _, err := New(Config{}); err == nil {
t.Fatal("expected error for missing base url")
}
}

View File

@@ -0,0 +1,49 @@
package workersdk
import (
"errors"
"net/http"
"strings"
"git.stella-ops.org/stellaops/orchestrator/worker-sdk-go/internal/transport"
)
// Config holds SDK configuration.
type Config struct {
BaseURL string
APIKey string
TenantID string
ProjectID string
UserAgent string
Client transport.Client
Logger Logger
Metrics MetricsSink
}
func (c *Config) validate() error {
if strings.TrimSpace(c.BaseURL) == "" {
return errors.New("BaseURL is required")
}
return nil
}
func (c *Config) httpClient() transport.Client {
if c.Client != nil {
return c.Client
}
return transport.DefaultClient(http.DefaultTransport)
}
func (c *Config) logger() Logger {
if c.Logger != nil {
return c.Logger
}
return NoopLogger{}
}
func (c *Config) metrics() MetricsSink {
if c.Metrics != nil {
return c.Metrics
}
return NoopMetrics{}
}

View File

@@ -0,0 +1,29 @@
package workersdk
// ErrorCode represents orchestrator error categories.
type ErrorCode string
const (
ErrorCodeTemporary ErrorCode = "temporary"
ErrorCodePermanent ErrorCode = "permanent"
ErrorCodeFatal ErrorCode = "fatal"
ErrorCodeUnauth ErrorCode = "unauthorized"
ErrorCodeQuota ErrorCode = "quota_exceeded"
ErrorCodeValidation ErrorCode = "validation"
)
// ErrorClassification maps HTTP status to codes and retryability.
func ErrorClassification(status int) (ErrorCode, bool) {
switch {
case status == 401 || status == 403:
return ErrorCodeUnauth, false
case status >= 500 && status < 600:
return ErrorCodeTemporary, true
case status == 429:
return ErrorCodeQuota, true
case status >= 400 && status < 500:
return ErrorCodePermanent, false
default:
return "", false
}
}

View File

@@ -0,0 +1,24 @@
package workersdk
import "testing"
func TestErrorClassification(t *testing.T) {
cases := []struct {
status int
code ErrorCode
retry bool
}{
{500, ErrorCodeTemporary, true},
{503, ErrorCodeTemporary, true},
{429, ErrorCodeQuota, true},
{401, ErrorCodeUnauth, false},
{400, ErrorCodePermanent, false},
{404, ErrorCodePermanent, false},
}
for _, c := range cases {
code, retry := ErrorClassification(c.status)
if code != c.code || retry != c.retry {
t.Fatalf("status %d -> got %s retry %v", c.status, code, retry)
}
}
}

View File

@@ -0,0 +1,15 @@
package workersdk
import "context"
// Logger is a minimal structured logger interface.
type Logger interface {
Info(ctx context.Context, msg string, fields map[string]any)
Error(ctx context.Context, msg string, fields map[string]any)
}
// NoopLogger is used when no logger provided.
type NoopLogger struct{}
func (NoopLogger) Info(_ context.Context, _ string, _ map[string]any) {}
func (NoopLogger) Error(_ context.Context, _ string, _ map[string]any) {}

View File

@@ -0,0 +1,17 @@
package workersdk
// MetricsSink allows callers to wire Prometheus or other metrics systems.
type MetricsSink interface {
IncClaimed()
IncAck(status string)
ObserveHeartbeatLatency(seconds float64)
IncHeartbeatFailures()
}
// NoopMetrics is the default sink when none is provided.
type NoopMetrics struct{}
func (NoopMetrics) IncClaimed() {}
func (NoopMetrics) IncAck(_ string) {}
func (NoopMetrics) ObserveHeartbeatLatency(_ float64) {}
func (NoopMetrics) IncHeartbeatFailures() {}

View File

@@ -0,0 +1,53 @@
package workersdk
import (
"context"
"math/rand"
"time"
)
// RetryPolicy defines retry behavior.
type RetryPolicy struct {
MaxAttempts int
BaseDelay time.Duration
MaxDelay time.Duration
Jitter float64 // between 0 and 1, represents percentage of jitter.
}
// DefaultRetryPolicy returns exponential backoff with jitter suitable for worker I/O.
func DefaultRetryPolicy() RetryPolicy {
return RetryPolicy{MaxAttempts: 5, BaseDelay: 200 * time.Millisecond, MaxDelay: 5 * time.Second, Jitter: 0.2}
}
// Retry executes fn with retries according to policy.
func Retry(ctx context.Context, policy RetryPolicy, fn func() error) error {
if policy.MaxAttempts <= 0 {
policy = DefaultRetryPolicy()
}
delay := policy.BaseDelay
for attempt := 1; attempt <= policy.MaxAttempts; attempt++ {
err := fn()
if err == nil {
return nil
}
if attempt == policy.MaxAttempts {
return err
}
// apply jitter
jitter := 1 + (policy.Jitter * (rand.Float64()*2 - 1))
sleepFor := time.Duration(float64(delay) * jitter)
if sleepFor > policy.MaxDelay {
sleepFor = policy.MaxDelay
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(sleepFor):
}
delay *= 2
if delay > policy.MaxDelay {
delay = policy.MaxDelay
}
}
return nil
}

View File

@@ -0,0 +1,10 @@
# StellaOps Orchestrator Worker SDK (Python)
Async-friendly SDK for StellaOps workers: claim jobs, acknowledge results, and attach tenant-aware auth headers. The default transport is dependency-free and can be swapped for aiohttp/httpx as needed.
## Quick start
```bash
export ORCH_BASE_URL=http://localhost:8080
export ORCH_API_KEY=dev-token
python sample_worker.py
```

View File

@@ -0,0 +1,9 @@
# Worker SDK (Python) — Task Tracker
| Task ID | Status | Notes | Updated (UTC) |
| --- | --- | --- | --- |
| WORKER-PY-32-001 | DONE | Async Python SDK scaffold with config/auth headers, claim/ack client, sample worker script, and unit tests using stub transport. | 2025-11-17 |
| WORKER-PY-32-002 | DONE | Heartbeat/progress helpers with logging/metrics and cancellation-safe retries. | 2025-11-17 |
| WORKER-PY-33-001 | DONE | Artifact publish/idempotency helpers with checksum hashing and storage adapters. | 2025-11-17 |
| WORKER-PY-33-002 | DONE | Error classification/backoff helper aligned to orchestrator codes and structured failure reports. | 2025-11-17 |
| WORKER-PY-34-001 | DONE | Backfill iteration, watermark handshake, and artifact dedupe verification utilities. | 2025-11-17 |

View File

@@ -0,0 +1,11 @@
[project]
name = "stellaops-orchestrator-worker"
version = "0.1.0"
description = "Async worker SDK for StellaOps Orchestrator"
authors = [{name = "StellaOps"}]
readme = "README.md"
requires-python = ">=3.10"
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

View File

@@ -0,0 +1,41 @@
import asyncio
import os
from stellaops_orchestrator_worker import (
AckJobRequest,
ClaimJobRequest,
Config,
OrchestratorClient,
)
from stellaops_orchestrator_worker.retry import RetryPolicy, retry
async def main():
cfg = Config(
base_url=os.environ.get("ORCH_BASE_URL", "http://localhost:8080"),
api_key=os.environ.get("ORCH_API_KEY", "dev-token"),
tenant_id=os.environ.get("ORCH_TENANT", "local-tenant"),
project_id=os.environ.get("ORCH_PROJECT", "demo-project"),
)
client = OrchestratorClient(cfg)
claim = await client.claim(ClaimJobRequest(worker_id="py-worker", capabilities=["pack-run"]))
if claim is None:
print("no work available")
return
# ... perform actual work described by claim.payload ...
await client.heartbeat(job_id=claim.job_id, lease_id=claim.lease_id)
await client.progress(job_id=claim.job_id, lease_id=claim.lease_id, pct=50, message="halfway")
async def _ack():
await client.ack(
AckJobRequest(job_id=claim.job_id, lease_id=claim.lease_id, status="succeeded"),
)
await retry(RetryPolicy(), _ack)
print(f"acknowledged job {claim.job_id}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,37 @@
"""Async worker SDK for StellaOps Orchestrator."""
from .client import OrchestratorClient, ClaimJobRequest, AckJobRequest, ClaimJobResponse
from .config import Config
from .metrics import MetricsSink, NoopMetrics
from .transport import Transport, InMemoryTransport, TransportRequest, TransportResponse
from .retry import RetryPolicy, retry
from .storage import publish_artifact, InMemoryStorage, ArtifactPublishResult, Storage
from .errors import ErrorCode, classify_status
from .backfill import Range, WatermarkHandshake, Deduper, execute_range, verify_and_publish_artifact
__all__ = [
"OrchestratorClient",
"ClaimJobRequest",
"ClaimJobResponse",
"AckJobRequest",
"Config",
"MetricsSink",
"NoopMetrics",
"RetryPolicy",
"retry",
"Storage",
"publish_artifact",
"InMemoryStorage",
"ArtifactPublishResult",
"Range",
"WatermarkHandshake",
"Deduper",
"execute_range",
"verify_and_publish_artifact",
"ErrorCode",
"classify_status",
"Transport",
"InMemoryTransport",
"TransportRequest",
"TransportResponse",
]

View File

@@ -0,0 +1,81 @@
from __future__ import annotations
import asyncio
import datetime as dt
from dataclasses import dataclass
from typing import Awaitable, Callable, Optional
from .storage import publish_artifact, ArtifactPublishResult, Storage
@dataclass
class Range:
start: dt.datetime
end: dt.datetime
def validate(self) -> None:
if self.end < self.start:
raise ValueError("range end before start")
@dataclass
class WatermarkHandshake:
expected: str
current: str
def validate(self) -> None:
if not self.expected:
raise ValueError("expected watermark required")
if self.expected != self.current:
raise ValueError("watermark mismatch")
class Deduper:
def __init__(self):
self._seen: set[str] = set()
def seen(self, key: str) -> bool:
if not key:
return False
if key in self._seen:
return True
self._seen.add(key)
return False
async def execute_range(r: Range, step: dt.timedelta, fn: Callable[[dt.datetime], Awaitable[None]]) -> None:
r.validate()
if step.total_seconds() <= 0:
raise ValueError("step must be positive")
current = r.start
while current <= r.end:
await fn(current)
current = current + step
async def verify_and_publish_artifact(
*,
storage: Storage,
wm: WatermarkHandshake,
dedupe: Optional[Deduper],
job_id: str,
lease_id: str,
object_key: str,
content: bytes,
content_type: str = "application/octet-stream",
artifact_type: Optional[str] = None,
idempotency_key: Optional[str] = None,
) -> ArtifactPublishResult:
wm.validate()
if dedupe and idempotency_key and dedupe.seen(idempotency_key):
raise ValueError("duplicate artifact idempotency key")
return await publish_artifact(
storage=storage,
job_id=job_id,
lease_id=lease_id,
object_key=object_key,
content=content,
content_type=content_type,
artifact_type=artifact_type,
idempotency_key=idempotency_key,
)

View File

@@ -0,0 +1,111 @@
from __future__ import annotations
import json
from dataclasses import dataclass
from typing import Optional
from urllib.parse import urljoin
from .config import Config
from .metrics import MetricsSink
from .transport import Transport, TransportRequest, TransportResponse
@dataclass
class ClaimJobRequest:
worker_id: str
capabilities: Optional[list[str]] = None
@dataclass
class ClaimJobResponse:
job_id: str
lease_id: str
job_type: Optional[str]
payload: dict
expires_at: Optional[str] = None
retry_after_seconds: Optional[int] = None
@dataclass
class AckJobRequest:
job_id: str
lease_id: str
status: str
message: Optional[str] = None
class OrchestratorClient:
"""Async client for job claim/ack operations."""
def __init__(self, config: Config, *, transport: Optional[Transport] = None):
config.validate()
self._cfg = config
self._transport = transport or config.get_transport()
self._metrics: MetricsSink = config.get_metrics()
async def claim(self, request: ClaimJobRequest) -> Optional[ClaimJobResponse]:
if not request.worker_id:
raise ValueError("worker_id is required")
body = json.dumps(request.__dict__).encode()
resp = await self._execute("POST", "/api/jobs/lease", body)
if resp.status == 204:
return None
if resp.status >= 300:
raise RuntimeError(f"claim failed: {resp.status} {resp.body.decode(errors='ignore')}")
data = json.loads(resp.body)
self._metrics.inc_claimed()
return ClaimJobResponse(
job_id=data["job_id"],
lease_id=data["lease_id"],
job_type=data.get("job_type"),
payload=data.get("payload", {}),
expires_at=data.get("expires_at"),
retry_after_seconds=data.get("retry_after_seconds"),
)
async def ack(self, request: AckJobRequest) -> None:
if not request.job_id or not request.lease_id or not request.status:
raise ValueError("job_id, lease_id, and status are required")
body = json.dumps(request.__dict__).encode()
resp = await self._execute("POST", f"/api/jobs/{request.job_id}/ack", body)
if resp.status >= 300:
raise RuntimeError(f"ack failed: {resp.status} {resp.body.decode(errors='ignore')}")
self._metrics.inc_ack(request.status)
async def heartbeat(self, *, job_id: str, lease_id: str) -> None:
if not job_id or not lease_id:
raise ValueError("job_id and lease_id are required")
body = json.dumps({"lease_id": lease_id}).encode()
resp = await self._execute("POST", f"/api/jobs/{job_id}/heartbeat", body)
if resp.status >= 300:
self._metrics.inc_heartbeat_failures()
raise RuntimeError(f"heartbeat failed: {resp.status} {resp.body.decode(errors='ignore')}")
# latency recorded by caller; keep simple here
async def progress(self, *, job_id: str, lease_id: str, pct: int, message: Optional[str] = None) -> None:
if pct < 0 or pct > 100:
raise ValueError("pct must be 0-100")
payload = {"lease_id": lease_id, "progress": pct}
if message:
payload["message"] = message
body = json.dumps(payload).encode()
resp = await self._execute("POST", f"/api/jobs/{job_id}/progress", body)
if resp.status >= 300:
raise RuntimeError(f"progress failed: {resp.status} {resp.body.decode(errors='ignore')}")
async def _execute(self, method: str, path: str, body: Optional[bytes]) -> TransportResponse:
url = urljoin(self._cfg.base_url.rstrip("/") + "/", path.lstrip("/"))
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"User-Agent": self._cfg.user_agent,
}
if self._cfg.api_key:
headers["Authorization"] = f"Bearer {self._cfg.api_key}"
if self._cfg.tenant_id:
headers["X-StellaOps-Tenant"] = self._cfg.tenant_id
if self._cfg.project_id:
headers["X-StellaOps-Project"] = self._cfg.project_id
req = TransportRequest(method=method, url=url, headers=headers, body=body)
return await self._transport.execute(req)

View File

@@ -0,0 +1,30 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
from .metrics import MetricsSink, NoopMetrics
from .transport import Transport, default_transport
@dataclass
class Config:
"""SDK configuration."""
base_url: str
api_key: Optional[str] = None
tenant_id: Optional[str] = None
project_id: Optional[str] = None
user_agent: str = "stellaops-worker-sdk-py/0.1"
transport: Optional[Transport] = None
metrics: Optional[MetricsSink] = None
def validate(self) -> None:
if not self.base_url:
raise ValueError("base_url is required")
def get_transport(self) -> Transport:
return self.transport or default_transport()
def get_metrics(self) -> MetricsSink:
return self.metrics or NoopMetrics()

View File

@@ -0,0 +1,24 @@
from __future__ import annotations
from enum import Enum
class ErrorCode(str, Enum):
TEMPORARY = "temporary"
PERMANENT = "permanent"
FATAL = "fatal"
UNAUTHORIZED = "unauthorized"
QUOTA = "quota_exceeded"
VALIDATION = "validation"
def classify_status(status: int) -> tuple[ErrorCode | None, bool]:
if status in (401, 403):
return ErrorCode.UNAUTHORIZED, False
if status == 429:
return ErrorCode.QUOTA, True
if 500 <= status < 600:
return ErrorCode.TEMPORARY, True
if 400 <= status < 500:
return ErrorCode.PERMANENT, False
return None, False

View File

@@ -0,0 +1,24 @@
from __future__ import annotations
from typing import Protocol
class MetricsSink(Protocol):
def inc_claimed(self) -> None: ...
def inc_ack(self, status: str) -> None: ...
def observe_heartbeat_latency(self, seconds: float) -> None: ...
def inc_heartbeat_failures(self) -> None: ...
class NoopMetrics:
def inc_claimed(self) -> None:
return None
def inc_ack(self, status: str) -> None:
return None
def observe_heartbeat_latency(self, seconds: float) -> None:
return None
def inc_heartbeat_failures(self) -> None:
return None

View File

@@ -0,0 +1,34 @@
from __future__ import annotations
import asyncio
import random
from dataclasses import dataclass
from typing import Awaitable, Callable
@dataclass
class RetryPolicy:
max_attempts: int = 5
base_delay: float = 0.2 # seconds
max_delay: float = 5.0 # seconds
jitter: float = 0.2 # +/- 20%
def _jittered(delay: float, jitter: float) -> float:
if jitter <= 0:
return delay
factor = 1 + ((random.random() * 2 - 1) * jitter)
return delay * factor
async def retry(policy: RetryPolicy, fn: Callable[[], Awaitable[None]]) -> None:
delay = policy.base_delay
for attempt in range(1, policy.max_attempts + 1):
try:
await fn()
return
except Exception: # pragma: no cover - caller handles fatal
if attempt == policy.max_attempts:
raise
await asyncio.sleep(min(_jittered(delay, policy.jitter), policy.max_delay))
delay = min(delay * 2, policy.max_delay)

View File

@@ -0,0 +1,56 @@
from __future__ import annotations
import hashlib
from dataclasses import dataclass
from typing import Protocol, Dict, Optional
class Storage(Protocol):
async def put_object(self, key: str, data: bytes, metadata: Dict[str, str]) -> None: ...
@dataclass
class ArtifactPublishResult:
sha256: str
size: int
async def publish_artifact(
*,
storage: Storage,
job_id: str,
lease_id: str,
object_key: str,
content: bytes,
content_type: str = "application/octet-stream",
artifact_type: Optional[str] = None,
idempotency_key: Optional[str] = None,
) -> ArtifactPublishResult:
if not job_id or not lease_id:
raise ValueError("job_id and lease_id are required")
if not object_key:
raise ValueError("object_key is required")
if storage is None:
raise ValueError("storage is required")
sha = hashlib.sha256(content).hexdigest()
metadata = {
"x-stellaops-job-id": job_id,
"x-stellaops-lease": lease_id,
"x-stellaops-ct": content_type,
}
if artifact_type:
metadata["x-stellaops-type"] = artifact_type
if idempotency_key:
metadata["x-idempotency-key"] = idempotency_key
await storage.put_object(object_key, content, metadata)
return ArtifactPublishResult(sha256=sha, size=len(content))
class InMemoryStorage(Storage):
def __init__(self):
self.calls = []
async def put_object(self, key: str, data: bytes, metadata: Dict[str, str]) -> None:
self.calls.append((key, data, metadata))

View File

@@ -0,0 +1,164 @@
import asyncio
import json
import unittest
import datetime as dt
from stellaops_orchestrator_worker import (
AckJobRequest,
ClaimJobRequest,
Config,
ErrorCode,
Deduper,
Range,
WatermarkHandshake,
execute_range,
verify_and_publish_artifact,
InMemoryStorage,
InMemoryTransport,
MetricsSink,
OrchestratorClient,
TransportRequest,
TransportResponse,
classify_status,
publish_artifact,
)
class ClientTests(unittest.TestCase):
def test_claim_and_ack_headers(self):
seen = {}
metric_calls = {"claimed": 0, "ack": 0, "hb_fail": 0}
class Metrics(MetricsSink):
def inc_claimed(self) -> None:
metric_calls["claimed"] += 1
def inc_ack(self, status: str) -> None:
metric_calls["ack"] += 1
def observe_heartbeat_latency(self, seconds: float) -> None:
metric_calls["latency"] = seconds
def inc_heartbeat_failures(self) -> None:
metric_calls["hb_fail"] += 1
def handler(req: TransportRequest) -> TransportResponse:
if req.url.endswith("/api/jobs/lease"):
seen["claim_headers"] = req.headers
seen["claim_url"] = req.url
body = json.loads(req.body)
self.assertEqual(body["worker_id"], "w1")
payload = {
"job_id": "123",
"lease_id": "l1",
"job_type": "demo",
"payload": {"k": "v"},
}
return TransportResponse(status=200, headers={}, body=json.dumps(payload).encode())
if req.url.endswith("/api/jobs/123/heartbeat"):
return TransportResponse(status=202, headers={}, body=b"")
if req.url.endswith("/api/jobs/123/progress"):
return TransportResponse(status=202, headers={}, body=b"")
seen["ack_headers"] = req.headers
seen["ack_url"] = req.url
return TransportResponse(status=202, headers={}, body=b"")
transport = InMemoryTransport(handler)
client = OrchestratorClient(
Config(base_url="http://orch/", api_key="t", tenant_id="tenant-a", project_id="project-1", metrics=Metrics()),
transport=transport,
)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
claim = loop.run_until_complete(
client.claim(ClaimJobRequest(worker_id="w1", capabilities=["scan"]))
)
self.assertEqual(claim.job_id, "123")
loop.run_until_complete(client.ack(AckJobRequest(job_id="123", lease_id="l1", status="succeeded")))
loop.run_until_complete(client.heartbeat(job_id="123", lease_id="l1"))
loop.run_until_complete(client.progress(job_id="123", lease_id="l1", pct=50, message="halfway"))
headers = seen["claim_headers"]
self.assertEqual(headers["Authorization"], "Bearer t")
self.assertEqual(headers["X-StellaOps-Tenant"], "tenant-a")
self.assertEqual(headers["X-StellaOps-Project"], "project-1")
self.assertIn("/api/jobs/lease", seen["claim_url"])
self.assertEqual(metric_calls["claimed"], 1)
self.assertEqual(metric_calls["ack"], 1)
def test_missing_worker_rejected(self):
client = OrchestratorClient(Config(base_url="http://orch"))
loop = asyncio.get_event_loop()
with self.assertRaises(ValueError):
loop.run_until_complete(client.claim(ClaimJobRequest(worker_id="")))
def test_publish_artifact(self):
storage = InMemoryStorage()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(
publish_artifact(
storage=storage,
job_id="j1",
lease_id="l1",
object_key="artifacts/j1/out.txt",
content=b"hello",
content_type="text/plain",
artifact_type="log",
idempotency_key="idem-1",
)
)
self.assertEqual(result.size, 5)
self.assertEqual(len(storage.calls), 1)
key, data, metadata = storage.calls[0]
self.assertEqual(key, "artifacts/j1/out.txt")
self.assertEqual(data, b"hello")
self.assertEqual(metadata["x-idempotency-key"], "idem-1")
def test_classify_status(self):
code, retry = classify_status(500)
self.assertEqual(code, ErrorCode.TEMPORARY)
self.assertTrue(retry)
code, retry = classify_status(404)
self.assertEqual(code, ErrorCode.PERMANENT)
self.assertFalse(retry)
def test_execute_range_and_watermark(self):
r = Range(start=dt.datetime(2025, 11, 15), end=dt.datetime(2025, 11, 17))
hits = []
async def fn(ts: dt.datetime):
hits.append(ts.date())
asyncio.get_event_loop().run_until_complete(execute_range(r, dt.timedelta(days=1), fn))
self.assertEqual(len(hits), 3)
with self.assertRaises(ValueError):
Range(start=r.end, end=r.start - dt.timedelta(days=1)).validate()
wm = WatermarkHandshake(expected="w1", current="w2")
with self.assertRaises(ValueError):
wm.validate()
def test_verify_and_publish_dedupe(self):
storage = InMemoryStorage()
dedupe = Deduper()
dedupe.seen("idem-1")
loop = asyncio.get_event_loop()
with self.assertRaises(ValueError):
loop.run_until_complete(
verify_and_publish_artifact(
storage=storage,
wm=WatermarkHandshake(expected="w", current="w"),
dedupe=dedupe,
job_id="j",
lease_id="l",
object_key="k",
content=b"",
idempotency_key="idem-1",
)
)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,62 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from typing import Awaitable, Callable, Dict, Optional
import json
import urllib.request
@dataclass
class TransportRequest:
method: str
url: str
headers: Dict[str, str]
body: Optional[bytes]
@dataclass
class TransportResponse:
status: int
headers: Dict[str, str]
body: bytes
class Transport:
"""Abstract transport interface for HTTP requests."""
async def execute(self, request: TransportRequest) -> TransportResponse: # pragma: no cover - interface
raise NotImplementedError
class _StdlibTransport(Transport):
def __init__(self, *, timeout: float = 10.0):
self._timeout = timeout
async def execute(self, request: TransportRequest) -> TransportResponse:
def _do() -> TransportResponse:
req = urllib.request.Request(
request.url, data=request.body, method=request.method, headers=request.headers
)
with urllib.request.urlopen(req, timeout=self._timeout) as resp: # nosec B310: controlled endpoint
return TransportResponse(
status=resp.status,
headers=dict(resp.headers.items()),
body=resp.read(),
)
return await asyncio.to_thread(_do)
class InMemoryTransport(Transport):
"""Simple stub transport for tests that returns a prepared response."""
def __init__(self, handler: Callable[[TransportRequest], TransportResponse]):
self._handler = handler
async def execute(self, request: TransportRequest) -> TransportResponse:
return self._handler(request)
def default_transport() -> Transport:
return _StdlibTransport()

View File

@@ -1,6 +0,0 @@
namespace StellaOps.Orchestrator.Core;
public class Class1
{
}

View File

@@ -0,0 +1,102 @@
using System.Collections.Immutable;
using System.Text.Json.Serialization;
namespace StellaOps.Orchestrator.Core;
public sealed record EventEnvelope(
[property: JsonPropertyName("schemaVersion")] string SchemaVersion,
[property: JsonPropertyName("eventId")] string EventId,
[property: JsonPropertyName("eventType")] string EventType,
[property: JsonPropertyName("occurredAt")] DateTimeOffset OccurredAt,
[property: JsonPropertyName("idempotencyKey")] string IdempotencyKey,
[property: JsonPropertyName("correlationId")] string? CorrelationId,
[property: JsonPropertyName("tenantId")] string TenantId,
[property: JsonPropertyName("projectId")] string? ProjectId,
[property: JsonPropertyName("actor")] EventActor Actor,
[property: JsonPropertyName("job")] EventJob Job,
[property: JsonPropertyName("metrics")] EventMetrics? Metrics,
[property: JsonPropertyName("notifier")] EventNotifier? Notifier)
{
public static EventEnvelope Create(
string eventType,
string tenantId,
EventJob job,
EventActor actor,
string schemaVersion = "orch.event.v1",
string? correlationId = null,
string? projectId = null,
EventMetrics? metrics = null,
EventNotifier? notifier = null,
DateTimeOffset? occurredAt = null,
string? eventId = null,
string? idempotencyKey = null)
{
ArgumentException.ThrowIfNullOrWhiteSpace(eventType);
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
ArgumentNullException.ThrowIfNull(job);
ArgumentNullException.ThrowIfNull(actor);
var occurred = occurredAt ?? DateTimeOffset.UtcNow;
var evtId = string.IsNullOrWhiteSpace(eventId) ? Guid.NewGuid().ToString() : eventId!;
var key = string.IsNullOrWhiteSpace(idempotencyKey)
? ComputeIdempotencyKey(eventType, job.Id, job.Attempt)
: idempotencyKey!;
return new EventEnvelope(
schemaVersion,
evtId,
eventType,
occurred,
key,
string.IsNullOrWhiteSpace(correlationId) ? null : correlationId,
tenantId,
string.IsNullOrWhiteSpace(projectId) ? null : projectId,
actor,
job,
metrics,
notifier);
}
public static string ComputeIdempotencyKey(string eventType, string jobId, int attempt)
{
ArgumentException.ThrowIfNullOrWhiteSpace(eventType);
ArgumentException.ThrowIfNullOrWhiteSpace(jobId);
return $"orch-{eventType}-{jobId}-{attempt}".ToLowerInvariant();
}
}
public sealed record EventActor(
[property: JsonPropertyName("subject")] string Subject,
[property: JsonPropertyName("scopes")] ImmutableArray<string> Scopes);
public sealed record EventJob(
[property: JsonPropertyName("id")] string Id,
[property: JsonPropertyName("type")] string Type,
[property: JsonPropertyName("runId")] string? RunId,
[property: JsonPropertyName("attempt")] int Attempt,
[property: JsonPropertyName("leaseId")] string? LeaseId,
[property: JsonPropertyName("taskRunnerId")] string? TaskRunnerId,
[property: JsonPropertyName("status")] string Status,
[property: JsonPropertyName("reason")] string? Reason,
[property: JsonPropertyName("payloadDigest")] string? PayloadDigest,
[property: JsonPropertyName("artifacts")] ImmutableArray<EventArtifact> Artifacts,
[property: JsonPropertyName("provenance")] ImmutableDictionary<string, string>? Provenance);
public sealed record EventArtifact(
[property: JsonPropertyName("uri")] string Uri,
[property: JsonPropertyName("digest")] string Digest,
[property: JsonPropertyName("mime")] string? Mime);
public sealed record EventMetrics(
[property: JsonPropertyName("durationSeconds")] double? DurationSeconds,
[property: JsonPropertyName("logStreamLagSeconds")] double? LogStreamLagSeconds,
[property: JsonPropertyName("backoffSeconds")] double? BackoffSeconds);
public sealed record EventNotifier(
[property: JsonPropertyName("channel")] string Channel,
[property: JsonPropertyName("delivery")] string Delivery,
[property: JsonPropertyName("replay")] EventReplayInfo? Replay);
public sealed record EventReplayInfo(
[property: JsonPropertyName("ordinal")] int Ordinal,
[property: JsonPropertyName("total")] int Total);

View File

@@ -0,0 +1,55 @@
using System.Collections.Immutable;
using System.Text.Json;
using StellaOps.Orchestrator.Core;
namespace StellaOps.Orchestrator.Tests;
public class EventEnvelopeTests
{
[Fact]
public void ComputeIdempotencyKey_IsDeterministicAndLowercase()
{
var key = EventEnvelope.ComputeIdempotencyKey("Job.Completed", "job_abc", 3);
Assert.Equal("orch-job.completed-job_abc-3", key);
}
[Fact]
public void Create_PopulatesDefaultsAndSerializes()
{
var job = new EventJob(
Id: "job_123",
Type: "pack-run",
RunId: "run_123",
Attempt: 2,
LeaseId: "lease_1",
TaskRunnerId: "tr_9",
Status: "completed",
Reason: null,
PayloadDigest: "sha256:deadbeef",
Artifacts: ImmutableArray.Create(new EventArtifact("s3://bucket/obj", "sha256:beef", "application/json")),
Provenance: ImmutableDictionary<string, string>.Empty);
var actor = new EventActor("worker-sdk-go", ImmutableArray.Create("orch:quota"));
var envelope = EventEnvelope.Create(
eventType: "job.completed",
tenantId: "tenant-alpha",
job: job,
actor: actor,
projectId: "proj-1",
correlationId: "corr-123");
Assert.False(string.IsNullOrWhiteSpace(envelope.EventId));
Assert.Equal("orch-job.completed-job_123-2", envelope.IdempotencyKey);
Assert.Equal("orch.event.v1", envelope.SchemaVersion);
Assert.Equal("tenant-alpha", envelope.TenantId);
Assert.Equal("proj-1", envelope.ProjectId);
var json = JsonSerializer.Serialize(envelope);
var roundtrip = JsonSerializer.Deserialize<EventEnvelope>(json);
Assert.NotNull(roundtrip);
Assert.Equal(envelope.IdempotencyKey, roundtrip!.IdempotencyKey);
Assert.Equal(envelope.Job.Id, roundtrip.Job.Id);
Assert.Equal(envelope.Actor.Subject, roundtrip.Actor.Subject);
}
}

View File

@@ -1,10 +0,0 @@
namespace StellaOps.Orchestrator.Tests;
public class UnitTest1
{
[Fact]
public void Test1()
{
}
}

View File

@@ -1,41 +1,19 @@
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
// Learn more about configuring OpenAPI at https://aka.ms/aspnet/openapi
builder.Services.AddOpenApi();
var app = builder.Build();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.MapOpenApi();
}
app.UseHttpsRedirection();
var summaries = new[]
{
"Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching"
};
app.MapGet("/weatherforecast", () =>
{
var forecast = Enumerable.Range(1, 5).Select(index =>
new WeatherForecast
(
DateOnly.FromDateTime(DateTime.Now.AddDays(index)),
Random.Shared.Next(-20, 55),
summaries[Random.Shared.Next(summaries.Length)]
))
.ToArray();
return forecast;
})
.WithName("GetWeatherForecast");
app.Run();
record WeatherForecast(DateOnly Date, int TemperatureC, string? Summary)
{
public int TemperatureF => 32 + (int)(TemperatureC / 0.5556);
}
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddRouting(options => options.LowercaseUrls = true);
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddOpenApi();
var app = builder.Build();
if (app.Environment.IsDevelopment())
{
app.MapOpenApi();
}
app.MapGet("/healthz", () => Results.Json(new { status = "ok" }));
app.MapGet("/readyz", () => Results.Json(new { status = "ready" }));
app.Run();
public partial class Program;