-
Notifications
You must be signed in to change notification settings - Fork 3.5k
/
Copy pathoss_object_client.go
231 lines (204 loc) · 7.62 KB
/
oss_object_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
package alibaba
import (
"context"
"flag"
"io"
"net/http"
"strconv"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/grafana/dskit/instrument"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/util/constants"
)
const NoSuchKeyErr = "NoSuchKey"
var ossRequestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: constants.Loki,
Name: "oss_request_duration_seconds",
Help: "Time spent doing OSS requests.",
Buckets: prometheus.ExponentialBuckets(0.005, 4, 7),
}, []string{"operation", "status_code"}))
func init() {
ossRequestDuration.Register()
}
type OssObjectClient struct {
defaultBucket *oss.Bucket
}
// OssConfig is config for the OSS Chunk Client.
type OssConfig struct {
Bucket string `yaml:"bucket"`
Endpoint string `yaml:"endpoint"`
AccessKeyID string `yaml:"access_key_id"`
SecretAccessKey string `yaml:"secret_access_key"`
ConnectionTimeoutSec int64 `yaml:"conn_timeout_sec"`
ReadWriteTimeoutSec int64 `yaml:"read_write_timeout_sec"`
}
// RegisterFlags registers flags.
func (cfg *OssConfig) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}
// RegisterFlagsWithPrefix registers flags with prefix.
func (cfg *OssConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Bucket, prefix+"oss.bucketname", "", "Name of OSS bucket.")
f.StringVar(&cfg.Endpoint, prefix+"oss.endpoint", "", "oss Endpoint to connect to.")
f.StringVar(&cfg.AccessKeyID, prefix+"oss.access-key-id", "", "alibabacloud Access Key ID")
f.StringVar(&cfg.SecretAccessKey, prefix+"oss.secret-access-key", "", "alibabacloud Secret Access Key")
f.Int64Var(&cfg.ConnectionTimeoutSec, prefix+"oss.conn-timeout-sec", 30, "Connection timeout in seconds")
f.Int64Var(&cfg.ReadWriteTimeoutSec, prefix+"oss.read-write-timeout-sec", 60, "Read/Write timeout in seconds")
}
func (cfg *OssConfig) Validate() error {
if cfg.ReadWriteTimeoutSec <= 0 {
return errors.New("read write timeout must be greater than 0")
}
return nil
}
// NewOssObjectClient makes a new chunk.Client that writes chunks to OSS.
func NewOssObjectClient(_ context.Context, cfg OssConfig) (client.ObjectClient, error) {
client, err := oss.New(cfg.Endpoint, cfg.AccessKeyID, cfg.SecretAccessKey, oss.Timeout(cfg.ConnectionTimeoutSec, cfg.ReadWriteTimeoutSec))
if err != nil {
return nil, err
}
bucket, err := client.Bucket(cfg.Bucket)
if err != nil {
return nil, err
}
return &OssObjectClient{
defaultBucket: bucket,
}, nil
}
func (s *OssObjectClient) Stop() {
}
func (s *OssObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
if _, err := s.objectAttributes(ctx, objectKey, "OSS.ObjectExists"); err != nil {
if s.IsObjectNotFoundErr(err) {
return false, nil
}
return false, err
}
return true, nil
}
func (s *OssObjectClient) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
return s.objectAttributes(ctx, objectKey, "OSS.GetAttributes")
}
func (s *OssObjectClient) objectAttributes(ctx context.Context, objectKey, operation string) (client.ObjectAttributes, error) {
var options []oss.Option
var objectSize int64
err := instrument.CollectedRequest(ctx, operation, ossRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
headers, requestErr := s.defaultBucket.GetObjectMeta(objectKey, options...)
if requestErr != nil {
return requestErr
}
objectSize, _ = strconv.ParseInt(headers.Get(oss.HTTPHeaderContentLength), 10, 64)
return nil
})
if err != nil {
return client.ObjectAttributes{}, err
}
return client.ObjectAttributes{Size: objectSize}, nil
}
// GetObject returns a reader and the size for the specified object key from the configured OSS bucket.
func (s *OssObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
var resp *oss.GetObjectResult
var options []oss.Option
err := instrument.CollectedRequest(ctx, "OSS.GetObject", ossRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
var requestErr error
resp, requestErr = s.defaultBucket.DoGetObject(&oss.GetObjectRequest{ObjectKey: objectKey}, options)
if requestErr != nil {
return requestErr
}
return nil
})
if err != nil {
return nil, 0, err
}
length := resp.Response.Headers.Get("Content-Length")
size, err := strconv.Atoi(length)
if err != nil {
return nil, 0, err
}
return resp.Response.Body, int64(size), err
}
// GetObject returns a reader and the size for the specified object key from the configured OSS bucket.
func (s *OssObjectClient) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
var resp *oss.GetObjectResult
options := []oss.Option{
oss.Range(offset, offset+length-1),
}
err := instrument.CollectedRequest(ctx, "OSS.GetObject", ossRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
var requestErr error
resp, requestErr = s.defaultBucket.DoGetObject(&oss.GetObjectRequest{ObjectKey: objectKey}, options)
if requestErr != nil {
return requestErr
}
return nil
})
if err != nil {
return nil, err
}
return resp.Response.Body, err
}
// PutObject puts the specified bytes into the configured OSS bucket at the provided key
func (s *OssObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
return instrument.CollectedRequest(ctx, "OSS.PutObject", ossRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
if err := s.defaultBucket.PutObject(objectKey, object); err != nil {
return errors.Wrap(err, "failed to put oss object")
}
return nil
})
}
// List implements chunk.ObjectClient.
func (s *OssObjectClient) List(ctx context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
var storageObjects []client.StorageObject
var commonPrefixes []client.StorageCommonPrefix
marker := oss.Marker("")
for {
if ctx.Err() != nil {
return nil, nil, ctx.Err()
}
objects, err := s.defaultBucket.ListObjects(oss.Prefix(prefix), oss.Delimiter(delimiter), marker)
if err != nil {
return nil, nil, errors.Wrap(err, "list alibaba oss bucket failed")
}
marker = oss.Marker(objects.NextMarker)
for _, object := range objects.Objects {
storageObjects = append(storageObjects, client.StorageObject{
Key: object.Key,
ModifiedAt: object.LastModified,
})
}
for _, object := range objects.CommonPrefixes {
if object != "" {
commonPrefixes = append(commonPrefixes, client.StorageCommonPrefix(object))
}
}
if !objects.IsTruncated {
break
}
}
return storageObjects, commonPrefixes, nil
}
// DeleteObject deletes the specified object key from the configured OSS bucket.
func (s *OssObjectClient) DeleteObject(ctx context.Context, objectKey string) error {
return instrument.CollectedRequest(ctx, "OSS.DeleteObject", ossRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
err := s.defaultBucket.DeleteObject(objectKey)
if err != nil {
return err
}
return nil
})
}
// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations.
func (s *OssObjectClient) IsObjectNotFoundErr(err error) bool {
switch caseErr := err.(type) {
case oss.ServiceError:
if caseErr.Code == NoSuchKeyErr && caseErr.StatusCode == http.StatusNotFound {
return true
}
return false
default:
return false
}
}
// TODO(dannyk): implement for client
func (s *OssObjectClient) IsRetryableErr(error) bool { return false }