Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent Cache #1249

Closed
gaius-qi opened this issue Apr 14, 2022 · 8 comments
Closed

Persistent Cache #1249

gaius-qi opened this issue Apr 14, 2022 · 8 comments
Assignees
Milestone

Comments

@gaius-qi
Copy link
Member

gaius-qi commented Apr 14, 2022

背景

Dragonfly 现阶段是针对只读数据进行 P2P 分发。通过改造 Dragonfly 让它支持写入操作,就能扩展出更多应用场景。

需求

  • 支持分层持久化缓存, 两层缓存: Dragonfly 和 Backend。
  • 支持对 Backend 同步以及异步写操作。
  • 支持读操作。
  • Backend 支持 S3 和 OSS。
  • 数据元信息以 Backend 为准。

设计

20220425170837

Command

新增命令行工具 dfpcache (Dragonfly Persistent Cache)

Usage:
  dfpcache [command] [flags]

Available Commands:
  get         get a task in persistent cache
  create      create a task to persistent cache
  delete      delete a task in persistent cache
  doc         generate documents
  help        help about any command
  version     show version

Flags:
  -h, --help                         help for dfget

dfpcache get:

Usage:
  dfpcache get <file> [flags]

Available Commands:
  doc         generate documents
  help        help about any command
  version     show version

Flags:
  -o, --output                       write to file instead of stdout
  -t, --type                         backend type, now supports oss or s3 storage
  -aki, --accessKeyID                backend accesskey id
  -aks, --accessKeySecret            backend accesskey secret 
  -e, --endpoint                     backend endpoint
  -r, --region                       backend region
  -b, --bucket                       backend bucket
       --timeout                     timeout for persistent cache operation
       --tag                         business tag for task
       --workHome                    dfpcache workHome directory
  -h, --help                         help for dfget

dfpcache create:

Usage:
  dfpcache put <file> [flags]

Available Commands:
  doc         generate documents
  help        help about any command
  version     show version

Flags:
  -t, --type                         backend type, now supports oss and s3 storage
  -aki, --accessKeyID                backend accesskey id
  -aks, --accessKeySecret            backend accesskey secret 
  -e, --endpoint                     backend endpoint
  -r, --region                       backend region
  -b, --bucket                       backend bucket
  -m, --mode                         create task mode, now supports sync and async mode
       --backend-only                store only to backend
       --timeout                     timeout for persistent cache operation
       --tag                         business tag for task
       --workHome                    dfpcache workHome directory
  -h, --help                         help for dfget

dfpcache delete:

Usage:
  dfpcache delete <file> [flags]

Available Commands:
  doc         generate documents
  help        help about any command
  version     show version

Flags:
  -t, --type                         backend type, now supports oss or s3 storage
  -aki, --accessKeyID                backend accesskey id
  -aks, --accessKeySecret            backend accesskey secret 
  -e, --endpoint                     backend endpoint
  -r, --region                       backend region
  -b, --bucket                       backend bucket
       --timeout                     timeout for persistent cache operation
       --tag                         business tag for task
       --workHome                    dfpcache workHome directory
  -h, --help                         help for dfget

Configuration

dfpcache 支持配置如下:

type Config struct {
	// Backend configuration
	Backend *BackendConfig `yaml:"backend" mapstructure:"backend"`

	// Timeout for persistent cache operation
	Timeout time.Duration `yaml:"timeou,omitemptyt" mapstructure:"timeout,omitempty"`

	// WorkHome directory
	WorkHome string `yaml:"workHome,omitempty" mapstructure:"workHome,omitempty"`
}

type BackendConfig struct {
	// Backend type
	Type string `yaml:"type" mapstructure:"type"`

	// AccessKey ID
	AccessKeyID string `yaml:"accessKeyID" mapstructure:"accessKeyID"`

	// AccessKey Secret
	AccessKeySecret string `yaml:"accessKeySecret" mapstructure:"accessKeySecret"`

	// Backend endpoint
	Endpoint string `yaml:"endpoint" mapstructure:"endpoint"`

	// Backend region
	Region string `yaml:"region" mapstructure:"region"`

	// Backend bucket
	Bucket string `yaml:"bucket" mapstructure:"bucket"`
}

Interface

// dfpcache
type Dfpcache Interface {
        // Get downloads the persistent cache
        Get(ctx context.Context, key string, options ...GetOption) (io.ReadCloser, error)

        // GetToFile downloads the persistent cache to a local file
        GetToFile(ctx context.Context, key, filepath string, options ...GetOption) error

        // Create creates a new persistent cache
        Create(ctx context.Context, key string, reader io.ReadCloser, options ...CreateOption) error

        // Create creates a new persistent cache from a local file
        CreateFromFile(ctx context.Context, key, filepath string, options ...CreateOption) error

        // Update updates a new persistent cache
        Update(ctx context.Context, key string, reader io.ReadSeeder, options ...UpdateOption) error

        // Delete deletes the persistent cache
        Delete(ctx context.Context, key string) error
}

Matedata

type Task struct {
	// ID is task id
	ID string

	// URL is task download url
	URL string

	// Type is task type
	Type int

	// URLMeta is task download url meta
	URLMeta *base.UrlMeta

        // Backend information for dfpcache
        Backend *Backend

	// ContentLength is task total content length
	ContentLength *atomic.Int64

	// TotalPieceCount is total piece count
	TotalPieceCount *atomic.Int32

	// BackToSourceLimit is back-to-source limit
	BackToSourceLimit *atomic.Int32

	// BackToSourcePeers is back-to-source sync map
	BackToSourcePeers set.SafeSet

	// Task state machine
	FSM *fsm.FSM

	// Piece sync map
	Pieces *sync.Map

	// Peer sync map
	Peers *sync.Map

	// PeerCount is peer count
	PeerCount *atomic.Int32

	// PeerFailedCount is peer failed count,
	// if one peer succeeds, the value is reset to zero
	PeerFailedCount *atomic.Int32

	// CreateAt is task create time
	CreateAt *atomic.Time

	// UpdateAt is task update time
	UpdateAt *atomic.Time
}

type UrlMeta struct {
	// url tag identifies different task for same url, conflict with digest
	Tag string

	// other url header infos
	Header map[string]string
}

type Backend struct {
	// Backend type
	Type string

	// Backend endpoint
	Endpoint string 

	// Backend region
	Region string

	// Backend bucket
	Bucket string

	// Backend key
	Key string
}
  • Task Type 为 TaskTypeDfpcache
  • Peer 等同于数据存储位置。

Sequence Diagram

Get

默认 Backend 配置

  1. 命中 Dfdaemon 本地缓存

image

  1. 命中其他 Peer

image

  1. 命中 Backend

image

自定义 Backend 配置

  1. 命中 Dfdaemon 本地缓存

image

  1. 命中其他 Peer

image

  1. 命中 Backend

image

SignURL

  1. 命中 Dfdaemon 本地缓存

image

  1. 命中其他 Peer

image

  1. 命中 Backend

image

Create

默认 Bucket 配置

  1. 同步写

image

  1. 异步写

image

  1. 只写 Backend

image

自定义 Backend 配置

  1. 同步写

image

  1. 异步写

image

  1. 只写 Backend

image

Update

默认 Backend 配置

image

自定义 Backend 配置

image

Delete

默认 Backend 配置

image

自定义 Backend 配置

image

注意

  • 使用 Task ID 生成逻辑: EndpointRegionBucketKey 以及 MD5 做 SHA256。
  • 使用 S3 HeadObjectOSS GetObjectMeta 来校验 AK 、判断文件是否存在并且获取 MD5。
  • 到 Dfpcache 去 Backend 校验是必要的,因为如果 Cache 这部分校验结果,安全无法得到保证。
  • Range 下载,调度器优先匹配有跨 Range 的 Piece 先调度,如果不存在该 Piece 则触发当前 Dfdaemon 优先下载对应 Piece。

其他

Dragonfly 文件系统

现在最终数据信息是以 OSS 为准。当然还有另一套方案就是存储所有数据元信息以 Dragonfly 为准,不需要前置访问 OSS,数据元信息存储到 Redis 里,增删改查最终参考 Redis 内元信息。OSS 只做一个稳定的副本,这种方案需要:

  1. Dragonfly 由 Manager 来做认证和访问控制(用 Manager 来做是因为增删改查 AK,并且关联权限功能要有前端):
    a. 实现一套类阿里云 or AWS 的 AK 管理
    b. 实现一套 S3 or OSS 的 AK 认证逻辑。包括 SignURL 生成认证等逻辑。
    c. 实现一套访问控制(RAM),抽象 AK 和 Path 对应权限。
  2. 根据 Path 抽象一套类似 RegionBucket 以及 Key 逻辑。
  3. 数据元信息由 Scheduler 存储 Redis,但是持久化存储所以可能导致 Redis 内数据过多。
  4. 元信息需要增加一些类似 MD5。
  5. 数据每次更改需要重新计算 MD5,更新数据元信息。

设计

20220426111838

  1. OSS 数据仅为文件系统中的一个稳定数据备份。数据元信息都存储 Redis,AK 以及访问控制相关 Mysql。
  2. AK 认证和访问控制都走 Manager,且在 Manager 创建存储 NamespaceApplication,AK 关联具体 NamespaceApplication 的读写权限。

存储路径抽象

分为 NamespaceApplication 以及 Key, OSS 存储路径 ${Namespace}/${Application}/${Key}

Task ID 生成逻辑

使用 NamespaceApplication 以及 Key 做 SHA256。

调度器副本相关

  1. 调度器调度时判断 Peer 的 Digest 过滤掉非当前 Task 的 Digest 的 Peer 进行下载。
  2. 每次 Update Backend 后要清理 Task 下非 Digest 的 Peer 副本。
  3. Dfdaemon 增加主动 Leave Task 接口,调度器定时主动清理 Task 对应的 Peer 副本。

Configuration

通过 Manager 配置文件配置 Backend 信息。Manager 认证和鉴权做完后,生成 SignURL 提供给 Dfpcache 进行下载(当然是没有命中的 peer 副本时需要到 Backend 下载才会用到)。

type Config struct {
	// Backend configuration
	Backend *BackendConfig `yaml:"backend" mapstructure:"backend"`
}

type BackendConfig struct {
	// Backend type
	Type string `yaml:"type" mapstructure:"type"`

	// AccessKey ID
	AccessKeyID string `yaml:"accessKeyID" mapstructure:"accessKeyID"`

	// AccessKey Secret
	AccessKeySecret string `yaml:"accessKeySecret" mapstructure:"accessKeySecret"`

	// Backend endpoint
	Endpoint string `yaml:"endpoint" mapstructure:"endpoint"`

	// Backend region
	Region string `yaml:"region" mapstructure:"region"`

	// Backend bucket
	Bucket string `yaml:"bucket" mapstructure:"bucket"`
}

Metadata

type Task struct {
	// ID is task id
	ID string

	// URL is task download url
	URL string

	// Type is task type
	Type int

	// URLMeta is task download url meta
	URLMeta *base.UrlMeta

        // Backend information for dfpcache
        Backend *Backend

	// ContentLength is task total content length
	ContentLength *atomic.Int64

	// TotalPieceCount is total piece count
	TotalPieceCount *atomic.Int32

	// BackToSourceLimit is back-to-source limit
	BackToSourceLimit *atomic.Int32

	// BackToSourcePeers is back-to-source sync map
	BackToSourcePeers set.SafeSet

	// Task state machine
	FSM *fsm.FSM

	// Piece sync map
	Pieces *sync.Map

	// Peer sync map
	Peers *sync.Map

	// PeerCount is peer count
	PeerCount *atomic.Int32

	// PeerFailedCount is peer failed count,
	// if one peer succeeds, the value is reset to zero
	PeerFailedCount *atomic.Int32

	// CreateAt is task create time
	CreateAt *atomic.Time

	// UpdateAt is task update time
	UpdateAt *atomic.Time
}

type UrlMeta struct {
	// url tag identifies different task for same url, conflict with digest
	Tag string

	// other url header infos
	Header map[string]string
}

type Backend struct {
	// Object namespace
	Namespace string

	// Object application
	Application string

	// Object key
	Key string

        // Object content's MD5
        Digest string 

        // Object ETag
        ETag string

        // Object content length
        ContentLength int64

	// CreateAt is object create time
	CreateAt *atomic.Time

	// UpdateAt is object update time
	UpdateAt *atomic.Time
}

type Peer struct {
     	// ID is peer id
	ID string

	// Type is peer type
	Type string

	// Digest is peer content's MD5
	Digest string

	// BizTag is peer biz tag
	BizTag string

	// Pieces is piece bitset
	Pieces *bitset.BitSet

	// pieceCosts is piece downloaded time
	pieceCosts []int64

	// Stream is grpc stream instance
	Stream *atomic.Value

	// Task state machine
	FSM *fsm.FSM

	// Task is peer task
	Task *Task

	// Host is peer host
	Host *Host

	// Parent is peer parent
	Parent *atomic.Value

	// Children is peer children
	Children *sync.Map

	// ChildCount is child count
	ChildCount *atomic.Int32

	// StealPeers is steal peer ids
	StealPeers set.SafeSet

	// BlockPeers is bad peer ids
	BlockPeers set.SafeSet

	// NeedBackToSource needs downloaded from source
	//
	// When peer is registering, at the same time,
	// scheduler needs to create the new corresponding task and the cdn is disabled,
	// NeedBackToSource is set to true
	NeedBackToSource *atomic.Bool

	// IsBackToSource is downloaded from source
	//
	// When peer is scheduling and NeedBackToSource is true,
	// scheduler needs to return Code_SchedNeedBackSource and
	// IsBackToSource is set to true
	IsBackToSource *atomic.Bool

	// CreateAt is peer create time
	CreateAt *atomic.Time

	// UpdateAt is peer update time
	UpdateAt *atomic.Time
}

折中方案

也可以有一种折中的方案基于 OSS 做访问控制,Dragonfly 来保证存储所有数据元信息。问题在于需要前置访问 OSS 做校验,存储也必须按照 OSS 定义的 EndpointRegionBucket 以及 Key 来做。

参考

@gaius-qi gaius-qi added this to the v2.0.3 milestone Apr 14, 2022
@gaius-qi gaius-qi self-assigned this Apr 14, 2022
@gaius-qi gaius-qi changed the title [RFC] Dragonfly Writable Cache [RFC] Writable Cache Apr 14, 2022
@gaius-qi gaius-qi changed the title [RFC] Writable Cache [RFC] Tiered Cache Apr 14, 2022
@gaius-qi gaius-qi changed the title [RFC] Tiered Cache [RFC] Persistent Cache Apr 14, 2022
@eryugey
Copy link
Contributor

eryugey commented Apr 19, 2022

LGTM!

@gaius-qi
Copy link
Member Author

gaius-qi commented Apr 20, 2022

  • 只做持久化,用户必须感知到 backend 入参,去除 cache only 模式。
  • 根据对象存储 Bucket、Region 以及 Filepath 来做 TaskID。并且有新的 DownloadURL 字段作为真正下载的 URL。
  • 对象读:
    • 如果通过对象存储参数访问,则通过 TaskID 找 Local cache => Scheduler 找 P2P cache => 如果没有则生成 n 分钟过期加签 DownloadURL,然后访问 backend 进行下载(DownloadURL 过期则重新生成)。
    • 如果用户通过 URL 直接访问,先拿 DownloadURL,解析过期时间(https://help.aliyun.com/document_detail/31952.html)。防止每次 backend 检查。
      • 如果没过期,则直接通过 TaskID 找 Local cache => Scheduler 找 P2P cache => 如果都没有则直接通过用户传过来的 DownloadURL 进行下载。
      • 如果过期,先 Head 请求 backend 检查 URL 是否可用(是否存在?是否有权限?)。
        • 如果检查成功,则解析 URL 找到对应的 Task ID。然后通过 TaskID 找 Local cache => Scheduler 找 P2P cache => 如果都没有则直接通过用户传过来的 DownloadURL 进行下载。然后将 URL 存到 DownloadURL。
        • 如果检查不成功,直接报错。
  • 带 Range 的请求,需要跨 Piece 下载需要优化现有逻辑。
    • 对于 range 请求 client 上传 parent task id,scheduler 找 parent task id 有对应 piece 的 peer 优先返回。如果没有这个 piece 则触发 peer 优先下载这个 piece。
    • 对于对象访问,就是直接通过 Bucket、Region 以及 Filepath 做 TaskID, 找到的就是 parent task。
  • 对象写:
    • 异步写,允许 backend 写失败情况, 写 backend 失败也会返回成功。如果 backend 写失败,那么第一次访问,生成 DownloadURL 去 backend 检查 URL 是否可用的时候,会报错对象不存在(lazy request)。
  • dragonfly 中 dfpcache 元数据只存内存不持久化,最终以 backend 为准。
  • dfcache 元数据需要存 redis,并且以 redis 数据为准,并可以设置过期时间。

@anjia0532
Copy link
Contributor

LGTM

@eryugey
Copy link
Contributor

eryugey commented Apr 20, 2022

  • dragonfly 中 dfpcache 元数据只存内存不持久化,最终以 backend 为准。
  • dfcache 元数据需要存 redis,并且以 redis 数据为准,并可以设置过期时间。

这两个写反了吧?dfpcache的元数据需要redis持久化,dfcache可以不做持久化,只存内存就可以。但是dfcache的元数据最好可选用redis持久化,但是dfdaemon侧的dfcache缓存一定不能依赖redis,让dfdaemon storageManager自己管理,以减少延迟。

@gaius-qi
Copy link
Member Author

gaius-qi commented Apr 24, 2022

这两个写反了吧?dfpcache的元数据需要redis持久化,dfcache可以不做持久化,只存内存就可以。但是dfcache的元数据最好可选用redis持久化,但是dfdaemon侧的dfcache缓存一定不能依赖redis,让dfdaemon storageManager自己管理,以减少延迟。

dfpcache 不做临时 cache,只跟 backend 一致, 所以元数据跟之前一样存内存临时就可以。dfcache 选用 redis 持久化就可以。

@gaius-qi
Copy link
Member Author

gaius-qi commented Apr 26, 2022

结论:

  1. 现阶段满足 @bergwolf 按照第一种方案,数据最终以 Backend 为准,不做 Update 和 Append 需求。
  2. 长远目标跟 JuiceFS 结合,作为其读写的中间层。

@gaius-qi
Copy link
Member Author

gaius-qi commented Jun 5, 2022

需求

P1: 提供读能力。
P2: 提供写能力。

注意:

  1. 因为源站都是对象存储,可以直接基于 SignURL 拿对应 MD5 或 CRC64,然后解析 Object 唯一标识,组成 Task ID。所以即使没有写能力也可以利用 P2P,对文件读取进行加速。
  2. 如果增加写能力,则减少第一次回源造成的加载延迟。
  3. 需要依赖 Backend 保证数据可靠性。

文件系统

  1. 元数据:
    a. 由 juicefs 或 ulogfs 存储。

  2. 提供文件系统读支持:
    a. 提供 S3、OSS HTTP 协议读接口。

  3. 提供文件系统写支持(提供多副本存储能力, 提高 P2P 网络缓存命中率):
    a. juicefs: 代码包引入,juicefs 自己写入 S3 或 OSS,然后生成 Task ID 注入 Dragonfly。
    b. ulogfs: 提供 S3、OSS HTTP 协议写接口,由 Dragonfly 生成 Task ID。

  4. 热升级能力

提供对象存储

  1. 通过 minio 管理 S3 或 OSS。

  2. 提供文件系统读支持:
    a. 提供 S3、OSS HTTP 协议读接口。

时序图

Create

Async Mode

image

Sync Mode

image

Get

Hit Local Cache

image

Hit Peer Cache

image

Hit Backend

image

@gaius-qi gaius-qi modified the milestones: v2.0.3, v2.0.4 Jun 6, 2022
@gaius-qi
Copy link
Member Author

gaius-qi commented Jun 7, 2022

结论:

  1. dragonfly 提供 REST 接口读写文件,鉴权使用双向 TLS。
  2. 用户持有 Object ID 进行下载。
  3. OSS、S3 信息配置在 Dragonfly(Manager) 内。
  4. 写入 dfdaemon 后,scheduler 触发 seed peer 去 dfdaemon 下载内容。
  5. 异步 & 同步写 backend。
  6. 元数据以 OSS 或 S3 为准,scheduler 只做 cache 能命中就命中,命中不了就到 backend 去查看。删除的时候先删除 scheduler 的 task,再删除 backend 的 object。

@gaius-qi gaius-qi mentioned this issue Jul 7, 2022
8 tasks
@gaius-qi gaius-qi changed the title [RFC] Persistent Cache Persistent Cache Jul 7, 2022
@gaius-qi gaius-qi closed this as completed Jul 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants