Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
Signed-off-by: Owen Diehl <[email protected]>
  • Loading branch information
owen-d committed Feb 1, 2024
2 parents a0ec9cc + 1191f88 commit 7172d6b
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 82 deletions.
38 changes: 24 additions & 14 deletions docs/sources/get-started/labels/structured-metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,22 @@ description: Describes how to enable structure metadata for logs and how to quer
# What is structured metadata

{{% admonition type="warning" %}}
Structured metadata is an experimental feature and is subject to change in future releases of Grafana Loki.
Structured metadata is an experimental feature and is subject to change in future releases of Grafana Loki. This feature is not yet available for Cloud Logs users.
{{% /admonition %}}

{{% admonition type="warning" %}}
Structured metadata was added to chunk format V4 which is used if the schema version is greater or equal to `13`. (See [Schema Config]({{< relref "../../storage#schema-config" >}}) for more details about schema versions. )
{{% /admonition %}}

One of the powerful features of Loki is parsing logs at query time to extract metadata and build labels out of it.
However, the parsing of logs at query time comes with a cost which can be significantly high for, as an example,
large JSON blobs or a poorly written query using complex regex patterns.
Selecting proper, low cardinality labels is critical to operating and querying Loki effectively. Some metadata, especially infrastructure related metadata, can be difficult to embed in log lines, and is too high cardinality to effectively store as indexed labels (and therefore reducing performance of the index).

In addition, the data extracted from logs at query time is usually high cardinality, which can’t be stored
in the index as it would increase the cardinality too much, and therefore reduce the performance of the index.

Structured metadata is a way to attach metadata to logs without indexing them. Examples of useful metadata are
trace IDs, user IDs, and any other label that is often used in queries but has high cardinality and is expensive
Structured metadata is a way to attach metadata to logs without indexing them or including them in the log line content itself. Examples of useful metadata are
kubernetes pod names, process ID's, or any other label that is often used in queries but has high cardinality and is expensive
to extract at query time.

Structured metadata can also be used to query commonly needed metadata from log lines without needing to apply a parser at query time. Large json blobs or a poorly written query using complex regex patterns, for example, come with a high performance cost. Examples of useful metadata include trace IDs or user IDs.


## Attaching structured metadata to log lines

You have the option to attach structured metadata to log lines in the push payload along with each log line and the timestamp.
Expand All @@ -34,25 +32,37 @@ See the [Promtail: Structured metadata stage]({{< relref "../../send-data/promta

With Loki version 1.2.0, support for structured metadata has been added to the Logstash output plugin. For more information, see [logstash]({{< relref "../../send-data/logstash/_index.md" >}}).

{{% admonition type="warning" %}}
There are defaults for how much structured metadata can be attached per log line.
```
# Maximum size accepted for structured metadata per log line.
# CLI flag: -limits.max-structured-metadata-size
[max_structured_metadata_size: <int> | default = 64KB]
# Maximum number of structured metadata entries per log line.
# CLI flag: -limits.max-structured-metadata-entries-count
[max_structured_metadata_entries_count: <int> | default = 128]
```
{{% /admonition %}}

## Querying structured metadata

Structured metadata is extracted automatically for each returned log line and added to the labels returned for the query.
You can use labels of structured metadata to filter log line using a [label filter expression]({{< relref "../../query/log_queries#label-filter-expression" >}}).

For example, if you have a label `trace_id` attached to some of your log lines as structured metadata, you can filter log lines using:
For example, if you have a label `pod` attached to some of your log lines as structured metadata, you can filter log lines using:

```logql
{job="example"} | trace_id="0242ac120002"
{job="example"} | pod="myservice-abc1234-56789"`
```

Of course, you can filter by multiple labels of structured metadata at the same time:

```logql
{job="example"} | trace_id="0242ac120002" | user_id="superUser123"
{job="example"} | pod="myservice-abc1234-56789" | trace_id="0242ac120002"
```

Note that since structured metadata is extracted automatically to the results labels, some metric queries might return
an error like `maximum of series (50000) reached for a single query`. You can use the [Keep]({{< relref "../../query/log_queries#keep-labels-expression" >}}) and [Drop]({{< relref "../../query/log_queries#drop-labels-expression" >}}) stages to filter out labels that you don't need.
Note that since structured metadata is extracted automatically to the results labels, some metric queries might return an error like `maximum of series (50000) reached for a single query`. You can use the [Keep]({{< relref "../../query/log_queries#keep-labels-expression" >}}) and [Drop]({{< relref "../../query/log_queries#drop-labels-expression" >}}) stages to filter out labels that you don't need.
For example:

```logql
Expand Down
5 changes: 1 addition & 4 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ type processor struct {

func (p *processor) run(ctx context.Context, tasks []Task) error {
for ts, tasks := range group(tasks, func(t Task) model.Time { return t.day }) {
interval := bloomshipper.Interval{
Start: ts,
End: ts.Add(Day),
}
interval := bloomshipper.NewInterval(ts, ts.Add(Day))
tenant := tasks[0].Tenant
err := p.processTasks(ctx, tenant, interval, []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}}, tasks)
if err != nil {
Expand Down
6 changes: 1 addition & 5 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,7 @@ func (w *worker) running(_ context.Context) error {
}

// interval is [Start, End)
interval := bloomshipper.Interval{
Start: day, // inclusive
End: day.Add(Day), // non-inclusive
}

interval := bloomshipper.NewInterval(day, day.Add(Day))
logger := log.With(w.logger, "day", day.Time(), "tenant", tasks[0].Tenant)
level.Debug(logger).Log("msg", "process tasks", "tasks", len(tasks))

Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/bloom/v1/bounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (b FingerprintBounds) Hash(h hash.Hash32) error {
enc.PutBE64(uint64(b.Min))
enc.PutBE64(uint64(b.Max))
_, err := h.Write(enc.Get())
return errors.Wrap(err, "writing OwnershipRange")
return errors.Wrap(err, "writing FingerprintBounds")
}

// Addr returns the string representation of the fingerprint bounds for use in
Expand Down Expand Up @@ -80,6 +80,7 @@ func (b FingerprintBounds) Cmp(fp model.Fingerprint) BoundsCheck {
return Overlap
}

// Overlaps returns whether the bounds (partially) overlap with the target bounds
func (b FingerprintBounds) Overlaps(target FingerprintBounds) bool {
return b.Cmp(target.Min) != After && b.Cmp(target.Max) != Before
}
Expand All @@ -89,7 +90,7 @@ func (b FingerprintBounds) Slice(min, max model.Fingerprint) *FingerprintBounds
return b.Intersection(FingerprintBounds{Min: min, Max: max})
}

// Returns whether the fingerprint is fully within the target bounds
// Within returns whether the fingerprint is fully within the target bounds
func (b FingerprintBounds) Within(target FingerprintBounds) bool {
return b.Min >= target.Min && b.Max <= target.Max
}
Expand Down
20 changes: 9 additions & 11 deletions pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func (r Ref) Cmp(fp uint64) v1.BoundsCheck {
return r.Bounds.Cmp(model.Fingerprint(fp))
}

func (r Ref) Interval() Interval {
return NewInterval(r.StartTimestamp, r.EndTimestamp)
}

type BlockRef struct {
Ref
IndexPath string
Expand Down Expand Up @@ -268,19 +272,13 @@ func createMetaRef(objectKey string, tenantID string, tableName string) (MetaRef
}, nil
}

func tablesForRange(periodConfig config.PeriodConfig, from, to model.Time) []string {
interval := periodConfig.IndexTables.Period
step := int64(interval.Seconds())
lower := from.Unix() / step
upper := to.Unix() / step
func tablesForRange(periodConfig config.PeriodConfig, interval Interval) []string {
step := int64(periodConfig.IndexTables.Period.Seconds())
lower := interval.Start.Unix() / step
upper := interval.End.Unix() / step
tables := make([]string, 0, 1+upper-lower)
prefix := periodConfig.IndexTables.Prefix
for i := lower; i <= upper; i++ {
tables = append(tables, joinTableName(prefix, i))
tables = append(tables, fmt.Sprintf("%s%d", periodConfig.IndexTables.Prefix, i))
}
return tables
}

func joinTableName(prefix string, tableNumber int64) string {
return fmt.Sprintf("%s%d", prefix, tableNumber)
}
3 changes: 2 additions & 1 deletion pkg/storage/stores/shipper/bloomshipper/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ func Test_BloomClient_FetchMetas(t *testing.T) {

searchParams := MetaSearchParams{
TenantID: "tenantA",

Keyspace: v1.NewBounds(50, 150),
Interval: Interval{Start: fixedDay.Add(-6 * day), End: fixedDay.Add(-1*day - 1*time.Hour)},
Interval: NewInterval(fixedDay.Add(-6*day), fixedDay.Add(-1*day-1*time.Hour)),
}

fetched, err := store.FetchMetas(context.Background(), searchParams)
Expand Down
59 changes: 59 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/interval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package bloomshipper

import (
"fmt"
"hash"

"github.com/pkg/errors"
"github.com/prometheus/common/model"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/util/encoding"
)

// Interval defines a time range with start end end time
// where the start is inclusive, the end is non-inclusive.
type Interval struct {
Start, End model.Time
}

func NewInterval(start, end model.Time) Interval {
return Interval{Start: start, End: end}
}

func (i Interval) Hash(h hash.Hash32) error {
var enc encoding.Encbuf
enc.PutBE64(uint64(i.Start))
enc.PutBE64(uint64(i.End))
_, err := h.Write(enc.Get())
return errors.Wrap(err, "writing Interval")
}

func (i Interval) String() string {
// 13 digits are enough until Sat Nov 20 2286 17:46:39 UTC
return fmt.Sprintf("%013d-%013d", i.Start, i.End)
}

func (i Interval) Repr() string {
return fmt.Sprintf("[%s, %s)", i.Start.Time().UTC(), i.End.Time().UTC())
}

// Cmp returns the position of a time relative to the interval
func (i Interval) Cmp(ts model.Time) v1.BoundsCheck {
if ts.Before(i.Start) {
return v1.Before
} else if ts.After(i.End) || ts.Equal(i.End) {
return v1.After
}
return v1.Overlap
}

// Overlaps returns whether the interval overlaps (partially) with the target interval
func (i Interval) Overlaps(target Interval) bool {
return i.Cmp(target.Start) != v1.After && i.Cmp(target.End) != v1.Before
}

// Within returns whether the interval is fully within the target interval
func (i Interval) Within(target Interval) bool {
return i.Start >= target.Start && i.End <= target.End
}
50 changes: 50 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/interval_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package bloomshipper

import (
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)

func Test_Interval_String(t *testing.T) {
start := model.Time(0)
end := model.TimeFromUnix(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC).Unix())
interval := NewInterval(start, end)
assert.Equal(t, "0000000000000-1704067200000", interval.String())
assert.Equal(t, "[1970-01-01 00:00:00 +0000 UTC, 2024-01-01 00:00:00 +0000 UTC)", interval.Repr())
}

func Test_Interval_Cmp(t *testing.T) {
interval := NewInterval(10, 20)
assert.Equal(t, v1.Before, interval.Cmp(0))
assert.Equal(t, v1.Overlap, interval.Cmp(10))
assert.Equal(t, v1.Overlap, interval.Cmp(15))
assert.Equal(t, v1.After, interval.Cmp(20)) // End is not inclusive
assert.Equal(t, v1.After, interval.Cmp(21))
}

func Test_Interval_Overlap(t *testing.T) {
interval := NewInterval(10, 20)
assert.True(t, interval.Overlaps(Interval{Start: 5, End: 15}))
assert.True(t, interval.Overlaps(Interval{Start: 15, End: 25}))
assert.True(t, interval.Overlaps(Interval{Start: 10, End: 20}))
assert.True(t, interval.Overlaps(Interval{Start: 5, End: 25}))
assert.False(t, interval.Overlaps(Interval{Start: 1, End: 9}))
assert.False(t, interval.Overlaps(Interval{Start: 20, End: 30})) // End is not inclusive
assert.False(t, interval.Overlaps(Interval{Start: 25, End: 30}))
}

func Test_Interval_Within(t *testing.T) {
target := NewInterval(10, 20)
assert.False(t, NewInterval(1, 9).Within(target))
assert.False(t, NewInterval(21, 30).Within(target))
assert.True(t, NewInterval(10, 20).Within(target))
assert.True(t, NewInterval(14, 15).Within(target))
assert.False(t, NewInterval(5, 15).Within(target))
assert.False(t, NewInterval(15, 25).Within(target))
assert.False(t, NewInterval(5, 25).Within(target))
}
26 changes: 3 additions & 23 deletions pkg/storage/stores/shipper/bloomshipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,11 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
)

type Interval struct {
Start, End model.Time
}

func (i Interval) String() string {
return fmt.Sprintf("[%s, %s)", i.Start.Time(), i.End.Time())
}

func (i Interval) Cmp(other model.Time) v1.BoundsCheck {
if other.Before(i.Start) {
return v1.Before
} else if other.After(i.End) || other.Equal(i.End) {
return v1.After
}
return v1.Overlap
}

type BlockQuerierWithFingerprintRange struct {
*v1.BlockQuerier
v1.FingerprintBounds
Expand Down Expand Up @@ -196,18 +178,16 @@ func BlocksForMetas(metas []Meta, interval Interval, keyspaces []v1.FingerprintB
// isOutsideRange tests if a given BlockRef b is outside of search boundaries
// defined by min/max timestamp and min/max fingerprint.
// Fingerprint ranges must be sorted in ascending order.
func isOutsideRange(b BlockRef, interval Interval, keyspaces []v1.FingerprintBounds) bool {
func isOutsideRange(b BlockRef, interval Interval, bounds []v1.FingerprintBounds) bool {
// check time interval
if interval.Cmp(b.EndTimestamp) == v1.Before || interval.Cmp(b.StartTimestamp) == v1.After {
if !interval.Overlaps(b.Interval()) {
return true
}

// check fingerprint ranges
for _, keyspace := range keyspaces {
for _, keyspace := range bounds {
if keyspace.Overlaps(b.Bounds) {
return false
}

}

return true
Expand Down
Loading

0 comments on commit 7172d6b

Please sign in to comment.