Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ Currently supported are the following metrics:
| vllm:lora_requests_info | Running stats on LoRA requests |
| vllm:kv_cache_usage_perc | The fraction of KV-cache blocks currently in use (from 0 to 1) |
| vllm:cache_config_info | Information of the LLMEngine CacheConfig |
| vllm:prefix_cache_hits | Prefix cache hits, in terms of number of cached tokens |
| vllm:prefix_cache_queries | Prefix cache queries, in terms of number of queried tokens |
18 changes: 18 additions & 0 deletions pkg/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,11 @@ type Metrics struct {
ReqPrefillTimeBucketValues []int `yaml:"prefill-time-buckets-values" json:"prefill-time-buckets-values"`
// ReqDecodeTimeBucketValues is an array of values for request decode time buckets.
ReqDecodeTimeBucketValues []int `yaml:"decode-time-buckets-values" json:"decode-time-buckets-values"`

// PrefixCacheHits is the initial value for the prefix cache hits counter (in tokens)
PrefixCacheHits *int64 `yaml:"prefix-cache-hits" json:"prefix-cache-hits,omitempty"`
// PrefixCacheQueries is the initial value for the prefix cache queries counter (in tokens)
PrefixCacheQueries *int64 `yaml:"prefix-cache-queries" json:"prefix-cache-queries,omitempty"`
}

type LorasMetrics struct {
Expand Down Expand Up @@ -689,6 +694,19 @@ func (c *Configuration) validate() error {
return errors.New("fake metrics decode-time-buckets-values cannot contain negative values")
}
}
if c.FakeMetrics.PrefixCacheHits != nil && *c.FakeMetrics.PrefixCacheHits < 0 {
return errors.New("fake metrics prefix-cache-hits cannot be negative")
}
if c.FakeMetrics.PrefixCacheQueries != nil && *c.FakeMetrics.PrefixCacheQueries < 0 {
return errors.New("fake metrics prefix-cache-queries cannot be negative")
}
if (c.FakeMetrics.PrefixCacheHits == nil) != (c.FakeMetrics.PrefixCacheQueries == nil) {
return errors.New("fake metrics prefix-cache-hits and prefix-cache-queries must be specified together")
}
if c.FakeMetrics.PrefixCacheHits != nil && c.FakeMetrics.PrefixCacheQueries != nil &&
*c.FakeMetrics.PrefixCacheHits > *c.FakeMetrics.PrefixCacheQueries {
return errors.New("fake metrics prefix-cache-hits cannot exceed prefix-cache-queries")
}
}

if c.DPSize < 1 || c.DPSize > 8 {
Expand Down
30 changes: 30 additions & 0 deletions pkg/common/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,36 @@ var _ = Describe("Simulator configuration", func() {
"--config", "../../manifests/config.yaml"},
expectedError: "fake metrics request-max-generation-tokens cannot contain negative values",
},
{
name: "invalid fake metrics: negative prefix-cache-hits",
args: []string{"cmd", "--fake-metrics", "{\"prefix-cache-hits\":-5,\"prefix-cache-queries\":10}",
"--config", "../../manifests/config.yaml"},
expectedError: "fake metrics prefix-cache-hits cannot be negative",
},
{
name: "invalid fake metrics: negative prefix-cache-queries",
args: []string{"cmd", "--fake-metrics", "{\"prefix-cache-hits\":0,\"prefix-cache-queries\":-1}",
"--config", "../../manifests/config.yaml"},
expectedError: "fake metrics prefix-cache-queries cannot be negative",
},
{
name: "invalid fake metrics: prefix-cache-hits without prefix-cache-queries",
args: []string{"cmd", "--fake-metrics", "{\"prefix-cache-hits\":100}",
"--config", "../../manifests/config.yaml"},
expectedError: "fake metrics prefix-cache-hits and prefix-cache-queries must be specified together",
},
{
name: "invalid fake metrics: prefix-cache-queries without prefix-cache-hits",
args: []string{"cmd", "--fake-metrics", "{\"prefix-cache-queries\":100}",
"--config", "../../manifests/config.yaml"},
expectedError: "fake metrics prefix-cache-hits and prefix-cache-queries must be specified together",
},
{
name: "invalid fake metrics: prefix-cache-hits exceeds prefix-cache-queries",
args: []string{"cmd", "--fake-metrics", "{\"prefix-cache-hits\":100,\"prefix-cache-queries\":50}",
"--config", "../../manifests/config.yaml"},
expectedError: "fake metrics prefix-cache-hits cannot exceed prefix-cache-queries",
},
{
name: "invalid echo mode with dataset",
args: []string{"cmd", "--model", "test", "--dataset-path", "my/path",
Expand Down
43 changes: 31 additions & 12 deletions pkg/kv-cache/kv_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,26 @@ import (
"github.com/llm-d/llm-d-kv-cache-manager/pkg/kvcache/kvblock"
)

// PrefixCacheStats holds token-level prefix cache statistics for a single request,
// matching vLLM's PrefixCacheStats semantics where both fields count tokens.
type PrefixCacheStats struct {
// QueriedTokens is the total number of prompt tokens checked against the cache
QueriedTokens int
// CachedTokens is the number of prompt tokens that were already cached
CachedTokens int
}

type KVCacheHelper struct {
tokenizer tokenizer.Tokenizer
tokensProcessor kvblock.TokenProcessor // turns tokens to kv block keys
logger logr.Logger
blockCache *blockCache
blockSize int
tokenizer tokenizer.Tokenizer
tokensProcessor kvblock.TokenProcessor // turns tokens to kv block keys
logger logr.Logger
blockCache *blockCache
blockSize int
prefixCacheStatsChan chan PrefixCacheStats
}

func NewKVCacheHelper(config *common.Configuration, logger logr.Logger, usageChan chan float64,
tokenizer tokenizer.Tokenizer) (*KVCacheHelper, error) {
prefixCacheStatsChan chan PrefixCacheStats, tokenizer tokenizer.Tokenizer) (*KVCacheHelper, error) {
tokenProcConfig := kvblock.DefaultTokenProcessorConfig()
tokenProcConfig.BlockSize = config.TokenBlockSize
if config.HashSeed != "" {
Expand All @@ -50,11 +60,12 @@ func NewKVCacheHelper(config *common.Configuration, logger logr.Logger, usageCha
return nil, fmt.Errorf("failed to create block cache: %w", err)
}
return &KVCacheHelper{
tokenizer: tokenizer,
tokensProcessor: tokensProcessor,
blockCache: blockCache,
logger: logger,
blockSize: config.TokenBlockSize,
tokenizer: tokenizer,
tokensProcessor: tokensProcessor,
blockCache: blockCache,
logger: logger,
blockSize: config.TokenBlockSize,
prefixCacheStatsChan: prefixCacheStatsChan,
}, nil
}

Expand Down Expand Up @@ -92,7 +103,8 @@ func (h *KVCacheHelper) OnRequestStart(vllmReq openaiserverapi.Request) (float64
return 0, err
}

vllmReq.SetNumberOfCachedPromptTokens(nBlocksAlreadyInCache * h.blockSize)
cachedTokens := nBlocksAlreadyInCache * h.blockSize
vllmReq.SetNumberOfCachedPromptTokens(cachedTokens)

totalBlocks := len(blockHashes)
cachedBlocks := h.blockCache.countCachedBlockPrefix(blockHashes)
Expand All @@ -102,6 +114,13 @@ func (h *KVCacheHelper) OnRequestStart(vllmReq openaiserverapi.Request) (float64
hitRate = float64(cachedBlocks) / float64(totalBlocks)
}

if h.prefixCacheStatsChan != nil {
common.WriteToChannel(h.prefixCacheStatsChan, PrefixCacheStats{
QueriedTokens: len(tokens),
CachedTokens: cachedTokens,
}, h.logger, "prefixCacheStatsChan")
}

return hitRate, nil
}

Expand Down
8 changes: 7 additions & 1 deletion pkg/llm-d-inference-sim/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ type metricsData struct {
requestParamsMaxTokens *prometheus.HistogramVec
// requestSuccessTotal is prometheus counter for total number of successful requests
requestSuccessTotal *prometheus.CounterVec
// prefixCacheHits is prometheus counter for total cached tokens (prefix cache hits)
prefixCacheHits *prometheus.CounterVec
// prefixCacheQueries is prometheus counter for total queried tokens (prefix cache queries)
prefixCacheQueries *prometheus.CounterVec
// prefixCacheStatsChan is a channel to update prefix cache hit/query counters
prefixCacheStatsChan chan kvcache.PrefixCacheStats
}

// LoRAs usage info for requests execution
Expand Down Expand Up @@ -190,7 +196,7 @@ func (s *simContext) initialize(ctx context.Context) error {

if s.config.EnableKVCache {
s.kvcacheHelper, err = kvcache.NewKVCacheHelper(s.config, s.logger,
s.metrics.kvCacheUsageChan, s.tokenizer)
s.metrics.kvCacheUsageChan, s.metrics.prefixCacheStatsChan, s.tokenizer)
if err != nil {
return err
}
Expand Down
64 changes: 64 additions & 0 deletions pkg/llm-d-inference-sim/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/prometheus/client_golang/prometheus"

"github.com/llm-d/llm-d-inference-sim/pkg/common"
kvcache "github.com/llm-d/llm-d-inference-sim/pkg/kv-cache"
vllmapi "github.com/llm-d/llm-d-inference-sim/pkg/vllm-api"
)

Expand All @@ -53,6 +54,8 @@ const (
reqWaitingMetricName = "vllm:num_requests_waiting"
kvCacheUsageMetricName = "vllm:kv_cache_usage_perc"
cacheConfigName = "vllm:cache_config_info"
prefixCacheHitsMetricName = "vllm:prefix_cache_hits"
prefixCacheQueriesMetricName = "vllm:prefix_cache_queries"
)

// createAndRegisterPrometheus creates and registers prometheus metrics used by vLLM simulator
Expand Down Expand Up @@ -271,6 +274,35 @@ func (s *simContext) createAndRegisterPrometheus(ctx context.Context) error {
s.metrics.kvCacheUsageChan = make(chan float64, maxNumberOfRequests)
go s.kvCacheUsageUpdater(ctx)

s.metrics.prefixCacheHits = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: "",
Name: prefixCacheHitsMetricName,
Help: "Prefix cache hits, in terms of number of cached tokens.",
},
[]string{vllmapi.PromLabelModelName},
)
if err := s.metrics.registry.Register(s.metrics.prefixCacheHits); err != nil {
s.logger.Error(err, "prometheus prefix_cache_hits counter register failed")
return err
}

s.metrics.prefixCacheQueries = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: "",
Name: prefixCacheQueriesMetricName,
Help: "Prefix cache queries, in terms of number of queried tokens.",
},
[]string{vllmapi.PromLabelModelName},
)
if err := s.metrics.registry.Register(s.metrics.prefixCacheQueries); err != nil {
s.logger.Error(err, "prometheus prefix_cache_queries counter register failed")
return err
}

s.metrics.prefixCacheStatsChan = make(chan kvcache.PrefixCacheStats, maxNumberOfRequests)
go s.prefixCacheStatsUpdater(ctx)

s.metrics.requestPromptTokens = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: "",
Expand Down Expand Up @@ -457,6 +489,12 @@ func (s *simContext) setInitialPrometheusMetrics(cacheConfig *prometheus.GaugeVe
if s.config.FakeMetrics.ReqDecodeTimeBucketValues != nil {
s.initFakeHistogram(s.metrics.reqDecodeTime, common.RequestLatencyBucketsBoundaries, s.config.FakeMetrics.ReqDecodeTimeBucketValues)
}
if s.config.FakeMetrics.PrefixCacheQueries != nil {
s.metrics.prefixCacheQueries.WithLabelValues(modelName).Add(float64(*s.config.FakeMetrics.PrefixCacheQueries))
}
if s.config.FakeMetrics.PrefixCacheHits != nil {
s.metrics.prefixCacheHits.WithLabelValues(modelName).Add(float64(*s.config.FakeMetrics.PrefixCacheHits))
}
}

s.metrics.runningRequests.WithLabelValues(modelName).Set(nRunningReqs)
Expand Down Expand Up @@ -621,6 +659,32 @@ func (s *simContext) kvCacheUsageUpdater(ctx context.Context) {
}
}

// prefixCacheStatsUpdater increments prefix cache hit/query counters by listening on the relevant channel
func (s *simContext) prefixCacheStatsUpdater(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case stats := <-s.metrics.prefixCacheStatsChan:
s.reportPrefixCacheStats(stats)
}
}
}

// reportPrefixCacheStats increments the prefix cache counters
func (s *simContext) reportPrefixCacheStats(stats kvcache.PrefixCacheStats) {
if s.config.FakeMetrics != nil {
return
}
modelName := s.getDisplayedModelName(s.config.Model)
if s.metrics.prefixCacheQueries != nil {
s.metrics.prefixCacheQueries.WithLabelValues(modelName).Add(float64(stats.QueriedTokens))
}
if s.metrics.prefixCacheHits != nil {
s.metrics.prefixCacheHits.WithLabelValues(modelName).Add(float64(stats.CachedTokens))
}
}

// ttftUpdater updates the time to first token metric by listening on the relevant channel
func (s *simContext) ttftUpdater(ctx context.Context) {
for {
Expand Down
Loading
Loading