Skip to content

Commit

Permalink
feat: improve compaction job state management (#3519)
Browse files Browse the repository at this point in the history
* Rename compaction 'pre queue' to 'block queue'

* Minor refactoring of compaction planning, a few unit tests

* Verify the db and in-memory states match

* Improve consistency of compaction job status updates, add tests

* Prioritize compaction level over expiry

* Simplify state updates for compaction job polling

* Move a few constants to config

* Add test for handling compaction failures

* Restore compaction metrics

* Remove unused functions, error return value

* Improve logging, naming, add a few comments

* Lock all segments when replacing source blocks with compacted blocks

* Keep cancelled jobs in the queue / db
  • Loading branch information
aleks-p authored Aug 28, 2024
1 parent 366d582 commit 32621d5
Show file tree
Hide file tree
Showing 19 changed files with 1,189 additions and 511 deletions.
1 change: 1 addition & 0 deletions api/compactor/v1/compactor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ enum CompactionStatus {
COMPACTION_STATUS_IN_PROGRESS = 1;
COMPACTION_STATUS_SUCCESS = 2;
COMPACTION_STATUS_FAILURE = 3;
COMPACTION_STATUS_CANCELLED = 4;
}

message CompletedJob {
Expand Down
61 changes: 33 additions & 28 deletions api/gen/proto/go/compactor/v1/compactor.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion api/openapiv2/gen/phlare.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,8 @@
"COMPACTION_STATUS_UNSPECIFIED",
"COMPACTION_STATUS_IN_PROGRESS",
"COMPACTION_STATUS_SUCCESS",
"COMPACTION_STATUS_FAILURE"
"COMPACTION_STATUS_FAILURE",
"COMPACTION_STATUS_CANCELLED"
],
"default": "COMPACTION_STATUS_UNSPECIFIED"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,10 @@ lifecycler:
# values:
#
# Secure Ciphers:
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_AES_128_GCM_SHA256
# - TLS_AES_256_GCM_SHA384
# - TLS_CHACHA20_POLY1305_SHA256
Expand All @@ -650,11 +654,7 @@ lifecycler:
# Insecure Ciphers:
# - TLS_RSA_WITH_RC4_128_SHA
# - TLS_RSA_WITH_3DES_EDE_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA256
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_ECDHE_ECDSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA
Expand Down Expand Up @@ -990,6 +990,10 @@ sharding_ring:
# values:
#
# Secure Ciphers:
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_AES_128_GCM_SHA256
# - TLS_AES_256_GCM_SHA384
# - TLS_CHACHA20_POLY1305_SHA256
Expand All @@ -1007,11 +1011,7 @@ sharding_ring:
# Insecure Ciphers:
# - TLS_RSA_WITH_RC4_128_SHA
# - TLS_RSA_WITH_3DES_EDE_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA256
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_ECDHE_ECDSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA
Expand Down Expand Up @@ -1339,6 +1339,10 @@ sharding_ring:
# values:
#
# Secure Ciphers:
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_AES_128_GCM_SHA256
# - TLS_AES_256_GCM_SHA384
# - TLS_CHACHA20_POLY1305_SHA256
Expand All @@ -1356,11 +1360,7 @@ sharding_ring:
# Insecure Ciphers:
# - TLS_RSA_WITH_RC4_128_SHA
# - TLS_RSA_WITH_3DES_EDE_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA256
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_ECDHE_ECDSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA
Expand Down Expand Up @@ -1547,6 +1547,10 @@ backoff_config:
# Override the default cipher suite list (separated by commas). Allowed values:
#
# Secure Ciphers:
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_AES_128_GCM_SHA256
# - TLS_AES_256_GCM_SHA384
# - TLS_CHACHA20_POLY1305_SHA256
Expand All @@ -1564,11 +1568,7 @@ backoff_config:
# Insecure Ciphers:
# - TLS_RSA_WITH_RC4_128_SHA
# - TLS_RSA_WITH_3DES_EDE_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA256
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_ECDHE_ECDSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA
Expand Down Expand Up @@ -1760,6 +1760,10 @@ The `memberlist` block configures the Gossip memberlist.
# Override the default cipher suite list (separated by commas). Allowed values:
#
# Secure Ciphers:
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_AES_128_GCM_SHA256
# - TLS_AES_256_GCM_SHA384
# - TLS_CHACHA20_POLY1305_SHA256
Expand All @@ -1777,11 +1781,7 @@ The `memberlist` block configures the Gossip memberlist.
# Insecure Ciphers:
# - TLS_RSA_WITH_RC4_128_SHA
# - TLS_RSA_WITH_3DES_EDE_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA256
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_ECDHE_ECDSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA
Expand Down
12 changes: 7 additions & 5 deletions pkg/experiment/compactor/compaction_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,17 @@ type Worker struct {
}

type Config struct {
JobCapacity int `yaml:"job_capacity"`
SmallObjectSize int `yaml:"small_object_size_bytes"`
TempDir string `yaml:"temp_dir"`
JobCapacity int `yaml:"job_capacity"`
JobPollInterval time.Duration `yaml:"job_poll_interval"`
SmallObjectSize int `yaml:"small_object_size_bytes"`
TempDir string `yaml:"temp_dir"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
const prefix = "compaction-worker."
tempdir := filepath.Join(os.TempDir(), "pyroscope-compactor")
f.IntVar(&cfg.JobCapacity, prefix+"job-capacity", 3, "How many concurrent jobs will a worker run at most.")
f.IntVar(&cfg.JobCapacity, prefix+"job-capacity", 3, "How many concurrent jobs will a compaction worker run at most.")
f.DurationVar(&cfg.JobPollInterval, prefix+"job-poll-interval", 5*time.Second, "How often will a compaction worker poll for jobs.")
f.IntVar(&cfg.SmallObjectSize, prefix+"small-object-size-bytes", 8<<20, "Size of the object that can be loaded in memory.")
f.StringVar(&cfg.TempDir, prefix+"temp-dir", tempdir, "Temporary directory for compaction jobs.")
}
Expand Down Expand Up @@ -81,7 +83,7 @@ func (w *Worker) starting(ctx context.Context) (err error) {
}

func (w *Worker) running(ctx context.Context) error {
ticker := time.NewTicker(5 * time.Second)
ticker := time.NewTicker(w.config.JobPollInterval)
defer ticker.Stop()
go func() {
for {
Expand Down
Loading

0 comments on commit 32621d5

Please sign in to comment.