Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…nto 568-nodes-list-generation-by-pattern

add changelog (qoollo#568)
  • Loading branch information
Tagir Asadullin committed Jan 31, 2024
2 parents fc19bd2 + 31cc454 commit 6a0bdb2
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 72 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,20 @@ Bob versions changelog
#### Changed
- Use cargo workspace to declare dependencies to avoid their duplication (#821)
- Record timestamp is now passed to Pearl level and used to find newest record in get and exist functions (#708)
- Save gRPC error when parsing status (#842)
- Increased number and max delay of retries for Bob state checking in integration tests (#856)
- Update writing logic for aliens integration tests to capture lost records problem (#851)
- Remove allocation on alien exist (#861)

#### Fixed
- Fix missing alien records due to multiple groups (#806)
- Fix warnings for imports (#858)


#### Updated
- Update crates versions: tokio, bytes, uuid, infer, base64, bitflags, regex, async-lock (#769)
- Pearl updated to v0.20.0 (#848)
- Pearl updated to v0.21.0 (#855)


## [2.1.0-alpha.12] - 2023-09-23
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ tokio = { version = "1.28", features = [] }

# pearl
[workspace.dependencies.pearl]
version = "0.20.0"
version = "0.21.0"


[profile.release]
Expand Down
11 changes: 4 additions & 7 deletions bob-backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ extern crate lazy_static;
pub(crate) mod prelude {
pub use anyhow::{Context, Result as AnyResult};
pub use bob_common::{
configs::{
cluster::Cluster as ClusterConfig,
node::{BackendType, Node as NodeConfig, Pearl as PearlConfig},
},
configs::node::{BackendType, Node as NodeConfig, Pearl as PearlConfig},
data::{BobData, BobKey, BobMeta},
operation_options::{BobPutOptions, BobGetOptions, BobDeleteOptions},
core_types::{DiskName, DiskPath, VDiskId},
Expand All @@ -35,18 +32,18 @@ pub(crate) mod prelude {
pub use futures::{stream::FuturesUnordered, StreamExt, TryFutureExt};
pub use pearl::{
filter::hierarchical::*, filter::traits::*, filter::Config as BloomConfig, Builder,
Error as PearlError, ErrorKind as PearlErrorKind, IoDriver, Key as KeyTrait,
ErrorKind as PearlErrorKind, IoDriver, Key as KeyTrait,
RefKey as RefKeyTrait, Storage,
};
pub use std::{
collections::{hash_map::Entry, HashMap},
collections::HashMap,
convert::TryInto,
fmt::{Debug, Display, Formatter, Result as FmtResult},
fs::Metadata,
io::{Error as IOError, ErrorKind as IOErrorKind, Result as IOResult},
path::{Path, PathBuf},
sync::Arc,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
time::{Duration, Instant, SystemTime},
};
pub use tokio::{
fs::{create_dir_all, read_dir, remove_dir_all, remove_file, DirEntry},
Expand Down
118 changes: 72 additions & 46 deletions bob-backend/src/pearl/disk_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,15 @@ impl DiskController {
Ok(groups)
}

async fn find_group(&self, operation: &Operation) -> BackendResult<Group> {
Self::find_in_groups(self.groups.read().await.iter(), operation).ok_or_else(|| {
Error::failed(format!("cannot find actual alien folder. {:?}", operation))
})
async fn find_all_groups(&self, operation: &Operation) -> smallvec::SmallVec<[Group; 1]> {
self.groups.read().await
.iter()
.filter(|group| group.can_process_operation(operation))
.cloned()
.collect()
}

fn find_in_groups<'pearl, 'op>(
fn find_single_group<'pearl, 'op>(
mut pearls: impl Iterator<Item = &'pearl Group>,
operation: &'op Operation,
) -> Option<Group> {
Expand All @@ -310,14 +312,14 @@ impl DiskController {
// block for read lock: it should be dropped before write lock is acquired (otherwise - deadlock)
{
let read_lock_groups = self.groups.read().await;
let pearl = Self::find_in_groups(read_lock_groups.iter(), operation);
let pearl = Self::find_single_group(read_lock_groups.iter(), operation);
if let Some(g) = pearl {
return Ok(g);
}
}

let mut write_lock_groups = self.groups.write().await;
let pearl = Self::find_in_groups(write_lock_groups.iter(), operation);
let pearl = Self::find_single_group(write_lock_groups.iter(), operation);
if let Some(g) = pearl {
return Ok(g);
}
Expand Down Expand Up @@ -465,15 +467,22 @@ impl DiskController {

pub(crate) async fn get_alien(&self, op: Operation, key: BobKey) -> Result<BobData, Error> {
if *self.state.read().await == GroupsState::Ready {
let vdisk_group = self.find_group(&op).await;
if let Ok(group) = vdisk_group {
group.get(key).await
let mut result: Option<BobData> = None;
for g in self.find_all_groups(&op).await {
match g.get(key).await {
Ok(data) if data.meta().timestamp() > result.as_ref().map_or(0, |d| d.meta().timestamp()) => {
result = Some(data);
},
Ok(_) => { },
Err(e) if e.is_key_not_found() => { },
Err(e) => {
debug!("error getting data from alien for op {:?}: {:?}", op, e);
},
}
}
if let Some(r) = result {
Ok(r)
} else {
debug!(
"GET[alien][{}] No alien group has been created for vdisk #{}",
key,
op.vdisk_id()
);
Err(Error::key_not_found(key))
}
} else {
Expand Down Expand Up @@ -506,20 +515,25 @@ impl DiskController {

pub(crate) async fn exist_alien(
&self,
operation: Operation,
op: Operation,
keys: &[BobKey],
) -> Result<Vec<bool>, Error> {
if *self.state.read().await == GroupsState::Ready {
let vdisk_group = self.find_group(&operation).await;
if let Ok(group) = vdisk_group {
group.exist(keys).await
} else {
trace!(
"EXIST[alien] No alien group has been created for vdisk #{}",
operation.vdisk_id()
);
Ok(vec![false; keys.len()])
let mut result: Option<Vec<bool>> = None;
for g in self.find_all_groups(&op).await {
match g.exist(keys).await {
Ok(r) => {
result = result.map(|mut result| {
for i in 0..r.len() {
result[i] |= r[i];
}
result
}).or(Some(r));
},
Err(e) => debug!("error getting exist results for op {:?}: {:?}", op, e),
}
}
Ok(result.unwrap_or_else(|| vec![false; keys.len()]))
} else {
Err(Error::dc_is_not_available())
}
Expand Down Expand Up @@ -561,30 +575,42 @@ impl DiskController {
force_delete: bool,
) -> Result<u64, Error> {
if *self.state.read().await == GroupsState::Ready {
let vdisk_group =
if !force_delete {
match self.find_group(&op).await {
Ok(group) => group,
Err(_) => return Ok(0)
let groups = self.find_all_groups(&op).await;
if force_delete && groups.is_empty() {
// If delete is forced we need to create at least one group
match self.get_or_create_pearl(&op).await {
Ok(group) => match group.delete(key, meta, StartTimestampConfig::new(false), force_delete).await {
Ok(r) => Ok(r),
Err(e) => {
debug!("DELETE[alien][{}] Error delete in one of the groups, op: {:?}, err: {}", key, op, e);
Err(self.process_error(e).await)
},
},
Err(e) => {
error!(
"DELETE[alien][{}] Cannot find group, op: {:?}, err: {}",
key, op, e
);
Err(Error::vdisk_not_found(op.vdisk_id()))
}
} else {
// we should create group only when force_delete == true
match self.get_or_create_pearl(&op).await {
Ok(group) => group,
Err(err) => {
error!(
"DELETE[alien][{}] Cannot find group, op: {:?}, err: {}",
key, op, err
);
return Err(Error::vdisk_not_found(op.vdisk_id()));
}
} else {
let mut result = 0;
let mut err = None;
for g in groups {
match g.delete(key, meta, StartTimestampConfig::new(false), force_delete).await {
Ok(r) => result += r,
Err(e) => {
debug!("DELETE[alien][{}] Error delete in one of the groups, op: {:?}, err: {}", key, op, e);
err = Some(self.process_error(e).await)
}
}
};

match vdisk_group.delete(key, meta, StartTimestampConfig::new(false), force_delete).await
{
Err(e) => Err(self.process_error(e).await),
Ok(x) => Ok(x),
}
if let Some(e) = err {
Err(e)
} else {
Ok(result)
}
}
} else {
Err(Error::dc_is_not_available())
Expand Down
1 change: 1 addition & 0 deletions bob-backend/src/pearl/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::prelude::*;

use super::{core::BackendResult, Pearl as PearlBackend};
use crate::core::{BackendStorage, Operation};
use bob_common::configs::cluster::Cluster as ClusterConfig;

static DISK_NAME: &str = "disk1";
static PEARL_PATH: &str = "/tmp/d1/";
Expand Down
29 changes: 15 additions & 14 deletions bob-common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,23 +156,24 @@ impl From<Status> for Error {
let name = words.next();
let length = status.message().len();
match name {
None => None,
None => Self::failed(format!("Can't parse status from {:?}, {}", status.code(), status.message())),
Some(name) => match name {
"KeyNotFound" => parse_next(words, Self::key_not_found),
"DuplicateKey" => Some(Self::duplicate_key()),
"Timeout" => Some(Self::timeout()),
"VDiskNotFound" => parse_next(words, Self::vdisk_not_found),
"Storage" => Some(Self::storage(rest_words(words, length))),
"VDiskIsNotReady" => Some(Self::vdisk_is_not_ready()),
"Failed" => Some(Self::failed(rest_words(words, length))),
"Internal" => Some(Self::internal()),
"PearlChangeState" => Some(Self::pearl_change_state(rest_words(words, length))),
"Unauthorized" => Some(Self::unauthorized()),
"HolderTemporaryUnavailable" => Some(Self::holder_temporary_unavailable()),
_ => None,
"KeyNotFound" => parse_next(words, Self::key_not_found)
.unwrap_or_else(|| Self::failed(format!("Failed to parse key from {}", status.message()))),
"DuplicateKey" => Self::duplicate_key(),
"Timeout" => Self::timeout(),
"VDiskNotFound" => parse_next(words, Self::vdisk_not_found)
.unwrap_or_else(|| Self::failed(format!("Failed to parse vdisk_id from {}", status.message()))),
"Storage" => Self::storage(rest_words(words, length)),
"VDiskIsNotReady" => Self::vdisk_is_not_ready(),
"Failed" => Self::failed(rest_words(words, length)),
"Internal" => Self::internal(),
"PearlChangeState" => Self::pearl_change_state(rest_words(words, length)),
"Unauthorized" => Self::unauthorized(),
"HolderTemporaryUnavailable" => Self::holder_temporary_unavailable(),
_ => Self::failed(format!("Can't parse status {:?} from {:?}, {}", name, status.code(), status.message())),
},
}
.unwrap_or_else(|| Self::failed("Can't parse status"))
}
}

Expand Down
2 changes: 1 addition & 1 deletion integration-tests/misc_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def request_metrics(port):
raise UserWarning('Failed to get metrics, retrying...')


@retry((UserWarning), tries=5, delay=1, backoff=2, max_delay=5)
@retry((UserWarning), tries=10, delay=1, backoff=1.75, max_delay=15)
def ensure_backend_up(bob_nodes_amount, rest_min_port):
try:
for item in range(bob_nodes_amount):
Expand Down
8 changes: 5 additions & 3 deletions integration-tests/tests_aliens.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ def run_tests(behaviour, args):
#runs put and stops nodes in cycle
written_count = 0
try:
for i in range(1, parsed_args.nodes_amount + 1):
#make correctly formatted args
for i in range(parsed_args.nodes_amount):
#make correctly formatted args
dict_args = make_run_args(parsed_args, written_count, record_amount)
dict_args['-p'] = str(parsed_args.transport_min_port + i)
bobp_args = args_to_str(dict_args)
#run put
print(f'Running bobp -b put {bobp_args.rstrip()}')
Expand All @@ -91,7 +92,7 @@ def run_tests(behaviour, args):
if not 'total err: 0' in str(p):
print_then_exit(f'Put test failed, see output.')
written_count += dict_args.get('-c')
if i < parsed_args.nodes_amount:
if i < parsed_args.nodes_amount - 1:
#stops one
sleep(10)
d_cli.container.stop(container_dict[str(parsed_args.transport_min_port + i)])
Expand Down Expand Up @@ -119,6 +120,7 @@ def run_tests(behaviour, args):


dict_args = make_run_args(parsed_args, 0, str(written_count))
dict_args['-p'] = str(parsed_args.transport_min_port + parsed_args.nodes_amount - 1)
bobp_args = args_to_str(dict_args)
for item in run_options:
run_tests(item, bobp_args)

0 comments on commit 6a0bdb2

Please sign in to comment.