diff --git a/cmd/Caddyfile b/cmd/Caddyfile index 34a67ed..dff285d 100644 --- a/cmd/Caddyfile +++ b/cmd/Caddyfile @@ -3,6 +3,7 @@ # debug order http_cache before reverse_proxy admin 0.0.0.0:7777 + } :9991 { @@ -19,6 +20,8 @@ } + + log { output file /tmp/logs/caddy/access.log format console diff --git a/cmd/main.go b/cmd/main.go index 7febce9..5bfbe06 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -30,10 +30,10 @@ import ( caddycmd "github.com/caddyserver/caddy/v2/cmd" // plug in Caddy modules here + _ "github.com/caddyserver/caddy/v2/modules/standard" _ "github.com/sillygod/cdp-cache" _ "github.com/sillygod/cdp-cache/extends/influxlog" - - _ "github.com/caddyserver/caddy/v2/modules/standard" + _ "github.com/sillygod/cdp-cache/extends/storage" ) func main() { diff --git a/example/consul_storage/Caddyfile b/example/consul_storage/Caddyfile new file mode 100644 index 0000000..197b691 --- /dev/null +++ b/example/consul_storage/Caddyfile @@ -0,0 +1,36 @@ +{ + order http_cache before reverse_proxy + admin 0.0.0.0:7777 + debug + + storage consul { + addr "consul:9500" + token "" + key_prefix "caddy_https" + } + +} + +:9991 { + reverse_proxy { + to localhost:9995 + } + + http_cache { + cache_type in_memory + match_path / + + default_max_age 1m + } + +} + +:9995 { + header Cache-control "public" + root * /tmp/caddy-benchmark + file_server + + log { + level info + } +} \ No newline at end of file diff --git a/example/distributed_cache/readme.org b/example/distributed_cache/readme.org index bb6de8e..fdf1ab7 100644 --- a/example/distributed_cache/readme.org +++ b/example/distributed_cache/readme.org @@ -1,17 +1,16 @@ * Distributed Cache Example - + Note! It's still under development. A lot of issues are remained to be solved. - * Experiment Before we start, spinning up your environment with ~PROJECT_PATH=/app docker-compose --project-directory=./ -f example/distributed_cache/docker-compose.yaml up~. Then, provision the test data with the helper scrips in the below section. ~docker exec -w /app/benchmark file bash provision.sh~ this will provision test data to the container file whose port is exported 9995. - + Start to test the cache is sync or not. First, ~curl http://localhost:9991/pg31674.txt~. It should go the source(file) to ask the content and then cache it. Second, ~curl http://localhost:9992/pg31674.txt~. This will try to get the cache from the peer if things are on track. - + #+begin_src sh uri=("http://localhost:9991/pg31674.txt" "http://localhost:9992/pg31674.txt" "http://localhost:9993/pg31674.txt" "http://localhost:9994/pg31674.txt") @@ -21,13 +20,13 @@ #+end_src * Helper scripts - + To provision the test data with the one of the following two commands. - + #+begin_src sh PROJECT_PATH=/app docker-compose --project-directory=./ -f example/distributed_cache/docker-compose.yaml exec -w /app/provision cdp "bash provision.sh" #+end_src - + #+begin_src sh docker exec -w /app/benchmark file bash provision.sh #+end_src @@ -35,4 +34,4 @@ If you want to run an interactive shell for development, the following command will do it for you. #+begin_src sh PROJECT_PATH=/app docker-compose --project-directory=./ -f example/distributed_cache/docker-compose.yaml run -p9992:9991 --entrypoint="/bin/bash" cdp2 - #+end_src + #+end_src diff --git a/extends/storage/caddyfile.go b/extends/storage/caddyfile.go new file mode 100644 index 0000000..8caa171 --- /dev/null +++ b/extends/storage/caddyfile.go @@ -0,0 +1,62 @@ +package mystorage + +import ( + "github.com/caddyserver/caddy/v2/caddyconfig/caddyfile" +) + +const ( + keyAddr = "addr" + keyToken = "token" + keyKeyPrefix = "key_prefix" +) + +// Config is the configuration for consul storage +type Config struct { + Addr string `json:"addr.omitempty"` + Token string `json:"token,omitempty"` + KeyPrefix string `json:"key_prefix,omitempty"` +} + +func getDefaultConfig() *Config { + return &Config{ + KeyPrefix: "_consul_cert_", + Addr: "localhost:8500", + } +} + +// UnmarshalCaddyfile deserialize Caddyfile tokens into consul storage's config +// storage consul { +// addr +// token +// key_prefix +// } +func (s *Storage) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { + config := getDefaultConfig() + + for d.Next() { + for d.NextBlock(0) { + parameter := d.Val() + args := d.RemainingArgs() + + switch parameter { + case keyAddr: + config.Addr = args[0] + case keyToken: + config.Token = args[0] + case keyKeyPrefix: + config.KeyPrefix = args[0] + + default: + return d.Errf("unrecognized subdirective %s", parameter) + + } + + } + } + s.Config = config + return nil +} + +var ( + _ caddyfile.Unmarshaler = (*Storage)(nil) +) diff --git a/extends/storage/consul.go b/extends/storage/consul.go new file mode 100644 index 0000000..40f5e85 --- /dev/null +++ b/extends/storage/consul.go @@ -0,0 +1,238 @@ +package mystorage + +import ( + "context" + "fmt" + "path" + "strings" + + "github.com/caddyserver/caddy/v2" + "github.com/caddyserver/certmagic" + "github.com/hashicorp/consul/api" +) + +func init() { + caddy.RegisterModule(Storage{}) +} + +// Storage implements the certmagic storage's interface +// This holds the consul client and kv store +type Storage struct { + Client *api.Client + locks map[string]*api.Lock + KV *api.KV + Config *Config +} + +// CaddyModule returns the Caddy module information +func (Storage) CaddyModule() caddy.ModuleInfo { + return caddy.ModuleInfo{ + ID: "caddy.storage.consul", + New: func() caddy.Module { return new(Storage) }, + } +} + +// CertMagicStorage transforms storage to certmagic.Storage +func (s *Storage) CertMagicStorage() (certmagic.Storage, error) { + return s, nil +} + +// Provision initializes the storage +func (s *Storage) Provision(ctx caddy.Context) error { + if s.Config == nil { + s.Config = getDefaultConfig() + } + + config := api.DefaultConfig() + config.Address = s.Config.Addr + config.Token = s.Config.Token + + client, err := api.NewClient(config) + if err != nil { + return err + } + + s.Client = client + if _, err := s.Client.Agent().NodeName(); err != nil { + return fmt.Errorf("err: %s, unable to ping consul", err.Error()) + } + + s.KV = s.Client.KV() + s.locks = make(map[string]*api.Lock) + + return nil +} + +// Validate checks the resource is set up correctly +func (s *Storage) Validate() error { + return nil +} + +// Cleanup releases the holding resources +func (s *Storage) Cleanup() error { + return nil +} + +func (s *Storage) generateKey(key string) string { + // https://www.consul.io/commands/kv/get + return path.Join(s.Config.KeyPrefix, key) +} + +// Store stores the key into consul's kv store +func (s *Storage) Store(key string, value []byte) error { + kv := &api.KVPair{Key: s.generateKey(key), Value: value} + + if _, err := s.KV.Put(kv, nil); err != nil { + return fmt.Errorf("unable to store data: %s, key: %s", err.Error(), key) + } + + return nil +} + +// Load retrieves the value at key. +func (s *Storage) Load(key string) ([]byte, error) { + kv, _, err := s.KV.Get(s.generateKey(key), &api.QueryOptions{RequireConsistent: true}) + if err != nil { + return nil, fmt.Errorf("unable to get data: %s, key: %s", err.Error(), s.generateKey(key)) + } + + if kv == nil { + return nil, certmagic.ErrNotExist(fmt.Errorf("key: %s does not exist", s.generateKey(key))) + } + + return kv.Value, nil +} + +// Delete deletes key. An error should be +// returned only if the key still exists +// when the method returns. +func (s *Storage) Delete(key string) error { + kv, _, err := s.KV.Get(s.generateKey(key), &api.QueryOptions{RequireConsistent: true}) + if err != nil { + return fmt.Errorf("unable to get data: %s, key: %s", err.Error(), s.generateKey(key)) + } + + success, _, err := s.KV.DeleteCAS(kv, nil) + if err != nil { + return fmt.Errorf("unable to delete data: %s, key: %s", err.Error(), s.generateKey(key)) + } + + if !success { + return fmt.Errorf("failed to delete data, key: %s", s.generateKey(key)) + } + + return nil +} + +// Exists returns true if the key exists +// and there was no error checking. +func (s *Storage) Exists(key string) bool { + kv, _, err := s.KV.Get(s.generateKey(key), &api.QueryOptions{RequireConsistent: true}) + return kv != nil && err == nil +} + +// List returns all keys that match prefix. +// If recursive is true, non-terminal keys +// will be enumerated (i.e. "directories" +// should be walked); otherwise, only keys +// prefixed exactly by prefix will be listed. +func (s *Storage) List(prefix string, recursive bool) ([]string, error) { + resultKeys := []string{} + + keys, _, err := s.KV.Keys(s.generateKey(prefix), "", &api.QueryOptions{RequireConsistent: true}) + if err != nil { + return resultKeys, err + } + + if len(keys) == 0 { + return resultKeys, certmagic.ErrNotExist(fmt.Errorf("no key at %s", prefix)) + } + + if recursive { + resultKeys = append(resultKeys, keys...) + return resultKeys, nil + } + + // process non-recursive result + keyMaps := map[string]struct{}{} + for _, key := range keys { + dir := strings.Split(strings.TrimPrefix(key, prefix+"/"), "/") + keyMaps[dir[0]] = struct{}{} + } + + for key := range keyMaps { + resultKeys = append(resultKeys, path.Join(prefix, key)) + } + + return resultKeys, nil +} + +// Stat returns information about key. +func (s *Storage) Stat(key string) (certmagic.KeyInfo, error) { + kv, _, err := s.KV.Get(s.generateKey(key), &api.QueryOptions{RequireConsistent: true}) + if err != nil { + return certmagic.KeyInfo{}, fmt.Errorf("unable to get data: %s, key: %s", err.Error(), s.generateKey(key)) + } + + if kv == nil { + return certmagic.KeyInfo{}, certmagic.ErrNotExist(fmt.Errorf("key: %s does not exist", s.generateKey(key))) + } + + // what will happend if I don't give the modified time + return certmagic.KeyInfo{ + Key: key, + Size: int64(len(kv.Value)), + IsTerminal: false, + }, nil +} + +// Lock locks key +func (s *Storage) Lock(ctx context.Context, key string) error { + if _, exists := s.locks[key]; exists { + return nil + } + + lock, err := s.Client.LockKey(s.generateKey(key)) + if err != nil { + return fmt.Errorf("err: %s, could not create lock for key: %s", err.Error(), s.generateKey(key)) + } + + lockCh, err := lock.Lock(ctx.Done()) + if err != nil { + return fmt.Errorf("err: %s, unable to lock: %s", err.Error(), s.generateKey(key)) + } + + s.locks[key] = lock + + go func() { + <-lockCh + s.Unlock(key) + }() + + return nil +} + +// Unlock unlocks key +func (s *Storage) Unlock(key string) error { + lock, exists := s.locks[key] + if !exists { + return fmt.Errorf("lock key: %s not found", s.generateKey(key)) + } + + err := lock.Unlock() + if err != nil { + return fmt.Errorf("unable to unlock: %s, key: %s", err.Error(), s.generateKey(key)) + } + + delete(s.locks, key) + return nil +} + +var ( + _ caddy.Provisioner = (*Storage)(nil) + _ caddy.CleanerUpper = (*Storage)(nil) + _ caddy.Validator = (*Storage)(nil) + _ certmagic.Storage = (*Storage)(nil) + _ certmagic.Locker = (*Storage)(nil) + _ caddy.StorageConverter = (*Storage)(nil) +) diff --git a/extends/storage/consul_test.go b/extends/storage/consul_test.go new file mode 100644 index 0000000..37abf42 --- /dev/null +++ b/extends/storage/consul_test.go @@ -0,0 +1,155 @@ +package mystorage + +import ( + "context" + "fmt" + "log" + "testing" + + "github.com/caddyserver/caddy/v2" + "github.com/caddyserver/caddy/v2/caddyconfig/caddyfile" + "github.com/caddyserver/caddy/v2/caddyconfig/httpcaddyfile" + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/suite" +) + +type StorageConsulTestSuite struct { + suite.Suite + pool *dockertest.Pool + resource *dockertest.Resource + sg *Storage +} + +func (suite *StorageConsulTestSuite) initConsulSg(port string) error { + h := httpcaddyfile.Helper{ + Dispenser: caddyfile.NewTestDispenser(fmt.Sprintf(` + { + storage consul { + addr "localhost:%s" + key_prefix "caddy_https" + } + } + `, port)), + } + suite.sg = new(Storage) + + if err := suite.sg.UnmarshalCaddyfile(h.Dispenser); err != nil { + return err + } + + if err := suite.sg.Provision(caddy.Context{}); err != nil { + return err + } + + return nil +} + +func (suite *StorageConsulTestSuite) SetupSuite() { + pool, err := dockertest.NewPool("") + if err != nil { + log.Fatal(err) + } + + consulVersion := "1.9.5" + resource, err := pool.Run("consul", consulVersion, []string{}) + if err != nil { + log.Fatal(err) + } + + suite.pool = pool + suite.resource = resource + port := resource.GetPort("8500/tcp") + + suite.pool.Retry(func() error { + return suite.initConsulSg(port) + }) + +} + +func (suite *StorageConsulTestSuite) TearDownSuite() { + if err := suite.pool.Purge(suite.resource); err != nil { + log.Fatal(err) + } +} + +func (suite *StorageConsulTestSuite) TestStore() { + + testData := []string{"hi", "hi/people"} + + for _, data := range testData { + err := suite.sg.Store(data, []byte(`OOOK`)) + suite.Nil(err) + } + + res, err := suite.sg.List("hi", true) + suite.Nil(err) + + expectedRes := []string{} + for _, data := range testData { + key := suite.sg.generateKey(data) + expectedRes = append(expectedRes, key) + } + + suite.Equal(expectedRes, res) + +} + +func (suite *StorageConsulTestSuite) TestLoad() { + err := suite.sg.Store("hi", []byte(`OOOK`)) + suite.Nil(err) + value, err := suite.sg.Load("hi") + suite.Nil(err) + + suite.Equal([]byte(`OOOK`), value) +} + +func (suite *StorageConsulTestSuite) TestDelete() { + err := suite.sg.Store("hi", []byte(`OOOK`)) + suite.Nil(err) + err = suite.sg.Delete("hi") + suite.Nil(err) + exists := suite.sg.Exists("hi") + suite.False(exists) +} + +func (suite *StorageConsulTestSuite) TestStat() { + err := suite.sg.Store("hi", []byte(`OOOK`)) + suite.Nil(err) + info, err := suite.sg.Stat("hi") + suite.Nil(err) + suite.Equal("hi", info.Key) +} + +func (suite *StorageConsulTestSuite) TestList() { + err := suite.sg.Store("example.com", []byte(`OOOK`)) + suite.Nil(err) + + err = suite.sg.Store("example.com/xx.crt", []byte(`OOOK`)) + suite.Nil(err) + + err = suite.sg.Store("example.com/xx.csr", []byte(`OOOK`)) + suite.Nil(err) + + keys, err := suite.sg.List("example.com", true) + suite.Nil(err) + suite.Len(keys, 3) +} + +func (suite *StorageConsulTestSuite) TestLockUnlock() { + ctx := context.Background() + err := suite.sg.Lock(ctx, "example.com/lock") + suite.Nil(err) + err = suite.sg.Unlock("example.com/lock") + suite.Nil(err) +} + +func (suite *StorageConsulTestSuite) TestExist() { + err := suite.sg.Store("hi", []byte(`OOOK`)) + suite.Nil(err) + exists := suite.sg.Exists("hi") + suite.True(exists) +} + +func TestStorageConsulTestSuite(t *testing.T) { + suite.Run(t, new(StorageConsulTestSuite)) +} diff --git a/go.mod b/go.mod index 14631be..3f558b1 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.16 require ( github.com/caddyserver/caddy/v2 v2.4.1 + github.com/caddyserver/certmagic v0.13.1 // indirect github.com/go-redis/redis v6.15.9+incompatible github.com/google/uuid v1.2.0 github.com/hashicorp/consul/api v1.8.1 diff --git a/readme.org b/readme.org index 29e19d4..2d01d05 100644 --- a/readme.org +++ b/readme.org @@ -84,7 +84,7 @@ Or you can use the [[https://github.com/caddyserver/xcaddy][xcaddy]] to build the executable file. Ensure you've install it by =go get -u github.com/caddyserver/xcaddy/cmd/xcaddy= #+begin_src sh - xcaddy build v2.0.0 --with github.com/sillygod/cdp-cache + xcaddy build v2.4.1 --with github.com/sillygod/cdp-cache #+end_src Xcaddy also provide a way to develop plugin locally. @@ -174,7 +174,9 @@ ** Test Result - The following benchmark is analysized by =wrk -c 50 -d 30s --latency -t 4 http://localhost:9991/pg31674.txt= without log open. Before running this, ensure you provision the tests data by =bash benchmark/provision.sh= + The following benchmark is analysized by =wrk -c 50 -d 30s --latency -t 4 http://localhost:9991/pg31674.txt= without log open. + Before running this, ensure you provision the tests data by =bash benchmark/provision.sh= + | | req/s | latency (50% 90% 99%) | | proxy + file cache | 13853 | 3.29ms / 4.09ms / 5.26ms | @@ -184,5 +186,3 @@ - [ ] distributed cache (in progress) - [ ] more optimization.. - - [ ] implement a consul-based storage for sharing certificates (https://github.com/pteich/caddy-tlsconsul) - https://github.com/caddyserver/certmagic/wiki/Storage-Implementations