Skip to content

Commit

Permalink
Merge pull request #410 from ipfs-force-community/cherry-pick/405
Browse files Browse the repository at this point in the history
Backports: #403 #348 #417 #435
  • Loading branch information
0x5459 authored Sep 21, 2022
2 parents 737015b + a1b6e10 commit 21a6dd8
Show file tree
Hide file tree
Showing 33 changed files with 664 additions and 299 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.zh.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
# Changelog

## v0.4.0-rc3
- venus-sector-manager
- 跟踪消息上链逻辑增加状态为 ReplaceMsg 的消息处理 [#435](https://github.com/ipfs-force-community/venus-cluster/issues/435)
- venus-worker
- 新增 add_pieces 外部执行器。 目的是为了进行并发控制, 避免所有的 add_pieces 同时启动,导致内存不足。 [#403](https://github.com/ipfs-force-community/venus-cluster/issues/403)
- vc-processors 优化日志输出。[#348](https://github.com/ipfs-force-community/venus-cluster/issues/348)
- 移除过大尺寸的日志。[#417](https://github.com/ipfs-force-community/venus-cluster/pull/417)

## v0.4.0-rc2
- venus-sector-manager
- 引入 venus v1.6.1; 引入 filecoin-ffi 内存泄漏修复后的版本; 引入 lotus v1.17.0 并进行兼容性修复 [#331](https://github.com/ipfs-force-community/venus-cluster/issues/331)

- venus-worker
- 修复venus-worker-util hwinfo 报错 segmentation fault. [#341](https://github.com/ipfs-force-community/venus-cluster/issues/341)
- 移除预分配 PC1 NUMA aware 内存的功能,避免潜在风险。venus-worker v0.5 规划了更好的 PC1 HugeTLB files 功能 [#350](https://github.com/ipfs-force-community/venus-cluster/issues/350).
Expand Down
18 changes: 15 additions & 3 deletions docs/en/03.venus-worker-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ location = "./mock-tmp/remote"
# c2 = 1
[processors.limitation.concurrent]
# add_pieces = 5
# pc1 = 3
# pc2 = 2
# c2 = 1
Expand All @@ -84,6 +85,9 @@ location = "./mock-tmp/remote"
[processors.static_tree_d]
# 2KiB = "./tmp/2k/sc-02-data-tree-d.dat"
# fields for the add_pieces processor
# [[processors.add_pieces]]
# fields for tree_d processor
[[processors.tree_d]]
Expand Down Expand Up @@ -420,7 +424,10 @@ It should be noted that when external processors are configured, the number of e

````toml
[processors.limitation.concurrent]
# tree_d task concurrency limit, optional, number type
# Consurrency limit for add_pieces task, optional, number_type
# add_pieces = 5

# Consurrency limit for tree_d task, optional, number type
# tree_d = 1

# Concurrency limit for the pc1 task, optional, number type
Expand Down Expand Up @@ -532,19 +539,23 @@ This is the configuration group used to configure external processors.

Currently `{stage_name}` could be one of the following.

- `add_pieces` for add pieces phase
- `tree_d` for generation phase of Tree D
- `pc1` for the PreCommit1 phase
- `pc2` for the PreCommit2 phase
- `c2`: for Commit2 phase
- `transfer`: used to customize the transfer method between local data and persistent data storage


Each such configuration group means that an external processor of the corresponding sealing phase will be started. The optional configuration items include:
Each such configuration group translates to an external processor of the corresponding sealing phase to be started. If nothing is configured for one of the above `{stage_name}` and corresponding `[[processors.{stage_name}]]` line does not exist in the configuration file,
then `venus-worker` will not start a child process for this `{stage_name}`. `venus-worker` will use the built-in processor code to directly execute the corresponding `{stage_name}` task in `sealing_thread`, which means that the concurrent number of `{stage_name}` tasks depends on the corresponding number of `sealing_thread` and concurrent number of `{stage_name}` as configured in `[processors.limitation.concurrent]`. By not configuring external processor saves extra steps such as serializing task parameters and task output, but one loses capabilities such as more powerful concurrency control, cgroup control, and custom proof algorithms. You are feel to choose according to your own needs.

The optional configuration items for `[[processors.{stage_name}]]` include:

````
[[processors.pc1]]
# Custom external processor executable file path, optional, string type
# By default, the executable file path corresponding to the main process will be used
# By default, the executable file path corresponding to the main process will be used, Execute the venus-worker builtin processor
# bin = "./dist/bin/venus-worker-plugin-pc1"
# Customize the parameters of the external processor, optional item, string array type
Expand Down Expand Up @@ -586,6 +597,7 @@ Each such configuration group means that an external processor of the correspond

````toml
[processors.limit]
add_pieces = 8
pc1 = 4
pc2 = 2
c2 = 1
Expand Down
17 changes: 15 additions & 2 deletions docs/zh/03.venus-worker的配置解析.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ location = "./mock-tmp/remote"
# c2 = 1
[processors.limitation.concurrent]
# add_pieces = 5
# pc1 = 3
# pc2 = 2
# c2 = 1
Expand All @@ -84,6 +85,9 @@ location = "./mock-tmp/remote"
[processors.static_tree_d]
# 2KiB = "./tmp/2k/sc-02-data-tree-d.dat"
# fields for the add_pieces processor
# [[processors.add_pieces]]
# fields for tree_d processor
[[processors.tree_d]]
Expand Down Expand Up @@ -452,6 +456,9 @@ location = "/mnt/remote/10.0.0.14/store"

```
[processors.limitation.concurrent]
# add_pieces 阶段的并发数限制,选填项,数字类型
# add_pieces = 5
# tree_d 阶段的并发数限制,选填项,数字类型
# tree_d = 1
Expand Down Expand Up @@ -565,6 +572,7 @@ locks = ["gpu2"]

目前 `{stage_name}` 可选

- `add_pieces` 用于 Add pieces 阶段
- `tree_d` 用于 Tree D 的生成阶段
- `pc1` 用于 PreCommit1 阶段
- `pc2` 用于 PreCommit2 阶段
Expand All @@ -575,12 +583,16 @@ locks = ["gpu2"]



每一个这样的配置组意味着将启动一个对应阶段的外部执行器,可选的配置项包含:
每一个这样的配置组意味着将启动一个对应阶段的外部执行器。如果没有为上述的某个 `{stage_name}` 配置组配置任何内容,且配置中不存在对应的 `[[processors.{stage_name}]]` 这一行配置,
`venus-worker` 不会为此 `{stage_name}` 启动子进程,`venus-worker` 会使用内建的执行器代码在 `sealing_thread` 中直接执行对应的 `{stage_name}` 任务。 这样 `{stage_name}` 任务的并发数量取决于对应的 `sealing_thread` 数量
`[processors.limitation.concurrent]` 中配置的 `{stage_name}` 并发数量。不配置外部执行器省去了序列化任务参数和任务输出等额外步骤,但是失去了更强大的并发控制,cgroup 控制和自定义算法等能力。可以根据使用场景自行取舍。

`[[processors.{stage_name}]]` 可选的配置项包含:

```
[[processors.pc1]]
# 自定义外部执行器可执行文件路径,选填项,字符串类型
# 默认会使用主进程对应的可执行文件路径
# 默认会使用主进程对应的可执行文件路径, 执行 venus-worker 内建的执行器
# bin = "./dist/bin/venus-worker-plugin-pc1"
# 自定义外部执行器的参数,选填项,字符串数组类型
Expand Down Expand Up @@ -623,6 +635,7 @@ locks = ["gpu2"]

```
[processors.limitation.concurrent]
add_pieces = 8
pc1 = 4
pc2 = 2
c2 = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ WAIT_RET:
}

switch ret.State {
case messager.MessageState.OnChainMsg:
case messager.MessageState.OnChainMsg, messager.MessageState.ReplacedMsg:
mret = ret
break WAIT_RET

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var utilMessageWaitCmd = &cli.Command{
}

fmt.Println("State:", msg.State.String())
if msg.State == messager.OnChainMsg {
if msg.State == messager.OnChainMsg || msg.State == messager.ReplacedMsg {
fmt.Println("Cid:", msg.SignedCid.String())
fmt.Println("Height:", msg.Height)
fmt.Println("Tipset:", msg.TipSetKey.String())
Expand Down Expand Up @@ -77,7 +77,7 @@ var utilMessageSearchCmd = &cli.Command{
}

fmt.Println("State:", msg.State.String())
if msg.State == messager.OnChainMsg {
if msg.State == messager.OnChainMsg || msg.State == messager.ReplacedMsg {
fmt.Println("Cid:", msg.SignedCid.String())
fmt.Println("Height:", msg.Height)
fmt.Println("Tipset:", msg.TipSetKey.String())
Expand Down
4 changes: 2 additions & 2 deletions venus-sector-manager/modules/impl/commitmgr/commitmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,15 +765,15 @@ func (c *CommitmentMgrImpl) handleMessage(ctx context.Context, mid abi.ActorID,
var maybeMsg *string
if msg.Receipt != nil && len(msg.Receipt.Return) > 0 {
msgRet := string(msg.Receipt.Return)
if msg.State != messager.MessageState.OnChainMsg {
if msg.State != messager.MessageState.OnChainMsg && msg.State != messager.MessageState.ReplacedMsg {
mlog.Warnf("MAYBE WARN from off-chain msg receipt: %s", msgRet)
}

maybeMsg = &msgRet
}

switch msg.State {
case messager.MessageState.OnChainMsg:
case messager.MessageState.OnChainMsg, messager.MessageState.ReplacedMsg:
confidence := c.cfg.MustMinerConfig(mid).Commitment.Confidence
if msg.Confidence < confidence {
return core.OnChainStatePacked, maybeMsg
Expand Down
4 changes: 2 additions & 2 deletions venus-sector-manager/modules/impl/sectors/snapup_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,12 +463,12 @@ func (h *snapupCommitHandler) waitForMessage() error {
}

var maybeMsg string
if msg.State != messager.MessageState.OnChainMsg && msg.Receipt != nil && len(msg.Receipt.Return) > 0 {
if msg.State != messager.MessageState.OnChainMsg && msg.State != messager.MessageState.ReplacedMsg && msg.Receipt != nil && len(msg.Receipt.Return) > 0 {
maybeMsg = string(msg.Receipt.Return)
}

switch msg.State {
case messager.MessageState.OnChainMsg:
case messager.MessageState.OnChainMsg, messager.MessageState.ReplacedMsg:
if msg.Confidence < int64(mcfg.SnapUp.MessageConfidential) {
return newTempErr(errMsgNotLanded, mcfg.SnapUp.Retry.PollInterval.Std())
}
Expand Down
2 changes: 1 addition & 1 deletion venus-sector-manager/pkg/messager/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var MessageState = struct {
FillMsg,
OnChainMsg,
FailedMsg,
ReplacedMsg,
ReplacedMsg, // Has been on-chain after being replaced by off-chain services, usually by `mpool replace`, eg. `venus mpool replace`
NoWalletMsg mtypes.MessageState
}{
mtypes.UnKnown,
Expand Down
2 changes: 1 addition & 1 deletion venus-sector-manager/ver/ver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ver

import "fmt"

const Version = "0.4.0-rc2"
const Version = "0.4.0-rc3"

var Commit string

Expand Down
3 changes: 2 additions & 1 deletion venus-worker/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion venus-worker/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "venus-worker"
version = "0.4.0-rc2"
version = "0.4.0-rc3"
authors = ["dtynn <[email protected]>"]
edition = "2021"
exclude = [
Expand Down
4 changes: 4 additions & 0 deletions venus-worker/assets/venus-worker.mock.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ location = "./mock-tmp/remote"
# enable_space_weighted = true

[processors.limitation.concurrent]
add_pieces = 5
pc1 = 3
pc2 = 2
c2 = 1
Expand All @@ -61,6 +62,9 @@ pc1 = "1min"
[processors.static_tree_d]
# 2KiB = "./tmp/2k/sc-02-data-tree-d.dat"

# fields for add_pieces processor
#[[processors.add_pieces]]

# fields for tree_d processor
[[processors.tree_d]]

Expand Down
10 changes: 7 additions & 3 deletions venus-worker/src/bin/venus-worker/processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use vc_processors::{
builtin::{
processors::BuiltinProcessor,
tasks::{
SnapEncode, SnapProve, Transfer, TreeD, WindowPoSt, WinningPoSt, C2, PC1, PC2, STAGE_NAME_C2, STAGE_NAME_PC1, STAGE_NAME_PC2,
STAGE_NAME_SNAP_ENCODE, STAGE_NAME_SNAP_PROVE, STAGE_NAME_TRANSFER, STAGE_NAME_TREED, STAGE_NAME_WINDOW_POST,
STAGE_NAME_WINNING_POST,
AddPieces, SnapEncode, SnapProve, Transfer, TreeD, WindowPoSt, WinningPoSt, C2, PC1, PC2, STAGE_NAME_ADD_PIECES, STAGE_NAME_C2,
STAGE_NAME_PC1, STAGE_NAME_PC2, STAGE_NAME_SNAP_ENCODE, STAGE_NAME_SNAP_PROVE, STAGE_NAME_TRANSFER, STAGE_NAME_TREED,
STAGE_NAME_WINDOW_POST, STAGE_NAME_WINNING_POST,
},
},
core::ext::run_consumer,
Expand All @@ -15,6 +15,7 @@ use vc_processors::{
pub const SUB_CMD_NAME: &str = "processor";

pub(crate) fn subcommand<'a, 'b>() -> App<'a, 'b> {
let add_pieces_cmd = SubCommand::with_name(STAGE_NAME_ADD_PIECES);
let tree_d_cmd = SubCommand::with_name(STAGE_NAME_TREED);
let pc1_cmd = SubCommand::with_name(STAGE_NAME_PC1);
let pc2_cmd = SubCommand::with_name(STAGE_NAME_PC2);
Expand All @@ -27,6 +28,7 @@ pub(crate) fn subcommand<'a, 'b>() -> App<'a, 'b> {

SubCommand::with_name(SUB_CMD_NAME)
.setting(AppSettings::ArgRequiredElseHelp)
.subcommand(add_pieces_cmd)
.subcommand(tree_d_cmd)
.subcommand(pc1_cmd)
.subcommand(pc2_cmd)
Expand All @@ -40,6 +42,8 @@ pub(crate) fn subcommand<'a, 'b>() -> App<'a, 'b> {

pub(crate) fn submatch(subargs: &ArgMatches<'_>) -> Result<()> {
match subargs.subcommand() {
(STAGE_NAME_ADD_PIECES, _) => run_consumer::<AddPieces, BuiltinProcessor>(),

(STAGE_NAME_PC1, _) => run_consumer::<PC1, BuiltinProcessor>(),

(STAGE_NAME_PC2, _) => run_consumer::<PC2, BuiltinProcessor>(),
Expand Down
9 changes: 7 additions & 2 deletions venus-worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,13 @@ pub struct Processors {
/// static tree_d paths for cc sectors
pub static_tree_d: Option<HashMap<String, String>>,

/// section for add_pieces processor
pub add_pieces: Option<Vec<Ext>>,

/// section for tree_d processor
pub tree_d: Option<Vec<Ext>>,

/// section for pc1 processor
pub pc1: Option<Vec<Ext>>,

/// section for pc2 processor
Expand All @@ -178,10 +183,10 @@ pub struct Processors {
/// section for c2 processor
pub c2: Option<Vec<Ext>>,

/// section for pc2 processor
/// section for snap_encode processor
pub snap_encode: Option<Vec<Ext>>,

/// section for c2 processor
/// section for snap_prove processor
pub snap_prove: Option<Vec<Ext>>,

/// section for transfer processor
Expand Down
5 changes: 4 additions & 1 deletion venus-worker/src/infra/piecestore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ pub mod proxy;
use anyhow::Result;
use fil_types::UnpaddedPieceSize;
use forest_cid::Cid;
use reqwest::Url;

pub trait PieceStore: Send + Sync {
fn get(&self, c: Cid, payload_size: u64, target_size: UnpaddedPieceSize) -> Result<Box<dyn Read>>;
fn get(&self, c: &Cid, payload_size: u64, target_size: UnpaddedPieceSize) -> Result<Box<dyn Read>>;

fn url(&self, c: &Cid) -> Url;
}
22 changes: 15 additions & 7 deletions venus-worker/src/infra/piecestore/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use reqwest::{
use super::PieceStore;
use crate::util::reader::inflator;

const ENDPOINT: &str = "/piecestore/";
const ENDPOINT: &str = "piecestore";
const HEADER_AUTHORIZATION_BEARER_PREFIX: &str = "Bearer";

pub struct ProxyPieceStore {
Expand All @@ -26,8 +26,10 @@ pub struct ProxyPieceStore {

impl ProxyPieceStore {
pub fn new(host: &str, token: Option<String>) -> Result<Self> {
let h = Url::parse(host).context("parse host")?;
let base = h.join(ENDPOINT).context("build endpoint")?;
let mut base = Url::parse(host).context("parse host")?;
base.path_segments_mut()
.map_err(|_| anyhow!("url cannot be a base"))?
.push(ENDPOINT);

let client = ProxyPieceStore::build_http_client(Policy::none())?;
let redirect_client = ProxyPieceStore::build_http_client(Policy::default())?;
Expand All @@ -53,9 +55,9 @@ impl ProxyPieceStore {
}

impl PieceStore for ProxyPieceStore {
fn get(&self, c: Cid, payload_size: u64, target_size: UnpaddedPieceSize) -> Result<Box<dyn Read>> {
let url = self.base.join(&c.to_string()).context("invalid url")?;
let mut resp = self.client.get(url).send().context("request to peicestore")?;
fn get(&self, c: &Cid, payload_size: u64, target_size: UnpaddedPieceSize) -> Result<Box<dyn Read>> {
let u = self.url(c);
let mut resp = self.client.get(u.clone()).send().context("request to peicestore")?;

let mut status_code = resp.status();
if status_code.is_redirection() {
Expand All @@ -64,7 +66,7 @@ impl PieceStore for ProxyPieceStore {
.get(LOCATION)
.context("redirect location not found")
.and_then(|val| val.to_str().context("convert redirect location to str"))
.and_then(|s| Url::parse(s).context("parse redirect url"))?;
.and_then(|location| u.join(location).context("join redirect url"))?;

let mut req = self.redirect_client.get(redirect_url);
if let Some(token) = self.token.as_ref() {
Expand All @@ -81,4 +83,10 @@ impl PieceStore for ProxyPieceStore {
let r = inflator(resp, payload_size, target_size).context("build inflator reader")?;
Ok(Box::new(r))
}

fn url(&self, c: &Cid) -> Url {
let mut u = self.base.clone();
u.path_segments_mut().unwrap().push(&c.to_string());
u
}
}
Loading

0 comments on commit 21a6dd8

Please sign in to comment.