Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform HEAD request for HttpStore::head #4837

Merged
merged 5 commits into from
Sep 23, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::client::header::header_meta;
use crate::client::header::{header_meta, HeaderConfig};
use crate::path::Path;
use crate::{Error, GetOptions, GetResult, ObjectMeta};
use crate::{GetResultPayload, Result};
Expand All @@ -28,6 +28,12 @@ use reqwest::Response;
pub trait GetClient: Send + Sync + 'static {
const STORE: &'static str;

/// Configure the [`HeaderConfig`] for this client
const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: true,
last_modified_required: true,
};
Comment on lines +31 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using Default would make the intent clearer here

Suggested change
/// Configure the [`HeaderConfig`] for this client
const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: true,
last_modified_required: true,
};
/// Configure the [`HeaderConfig`] used for requests issued by this client
const HEADER_CONFIG: HeaderConfig = HeaderConfig::default();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly this isn't possible as Default is not const


async fn get_request(
&self,
path: &Path,
Expand All @@ -49,10 +55,12 @@ impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let range = options.range.clone();
let response = self.get_request(location, options, false).await?;
let meta = header_meta(location, response.headers(), Default::default())
.map_err(|e| Error::Generic {
store: T::STORE,
source: Box::new(e),
let meta =
header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me a while to track down that the default for HEADER_CONFIG is the same as HeaderConfig::default and thus this isn't a change for S3Client and other implementations

Error::Generic {
store: T::STORE,
source: Box::new(e),
}
})?;

let stream = response
Expand All @@ -73,7 +81,7 @@ impl<T: GetClient> GetClientExt for T {
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let options = GetOptions::default();
let response = self.get_request(location, options, true).await?;
header_meta(location, response.headers(), Default::default()).map_err(|e| {
header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| {
Error::Generic {
store: T::STORE,
source: Box::new(e),
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/client/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use hyper::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
use hyper::HeaderMap;
use snafu::{OptionExt, ResultExt, Snafu};

#[derive(Debug)]
#[derive(Debug, Copy, Clone)]
/// Configuration for header extraction
pub struct HeaderConfig {
/// Whether to require an ETag header when extracting [`ObjectMeta`] from headers.
Expand Down
1 change: 0 additions & 1 deletion object_store/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ pub mod retry;
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub mod pagination;

#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub mod get;

#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
Expand Down
88 changes: 55 additions & 33 deletions object_store/src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use crate::client::get::GetClient;
use crate::client::header::HeaderConfig;
use crate::client::retry::{self, RetryConfig, RetryExt};
use crate::client::GetOptionsExt;
use crate::path::{Path, DELIMITER};
use crate::util::deserialize_rfc1123;
use crate::{ClientOptions, GetOptions, ObjectMeta, Result};
use async_trait::async_trait;
use bytes::{Buf, Bytes};
use chrono::{DateTime, Utc};
use percent_encoding::percent_decode_str;
Expand Down Expand Up @@ -238,39 +241,6 @@ impl Client {
Ok(())
}

pub async fn get(&self, location: &Path, options: GetOptions) -> Result<Response> {
let url = self.path_url(location);
let builder = self.client.get(url);
let has_range = options.range.is_some();

let res = builder
.with_get_options(options)
.send_retry(&self.retry_config)
.await
.map_err(|source| match source.status() {
// Some stores return METHOD_NOT_ALLOWED for get on directories
Some(StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED) => {
crate::Error::NotFound {
source: Box::new(source),
path: location.to_string(),
}
}
_ => Error::Request { source }.into(),
})?;

// We expect a 206 Partial Content response if a range was requested
// a 200 OK response would indicate the server did not fulfill the request
if has_range && res.status() != StatusCode::PARTIAL_CONTENT {
return Err(crate::Error::NotSupported {
source: Box::new(Error::RangeNotSupported {
href: location.to_string(),
}),
});
}

Ok(res)
}

pub async fn copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
let mut retry = false;
loop {
Expand Down Expand Up @@ -307,6 +277,58 @@ impl Client {
}
}

#[async_trait]
impl GetClient for Client {
const STORE: &'static str = "HTTP";

const HEADER_CONFIG: HeaderConfig = HeaderConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps you can add an explanatory comment about why this client overrides the default HEADER_CONFIG (so as to support http servers that don't implement WEBDAV?)

etag_required: false,
last_modified_required: false,
};

async fn get_request(
&self,
location: &Path,
options: GetOptions,
head: bool,
) -> Result<Response> {
let url = self.path_url(location);
let method = match head {
true => Method::HEAD,
false => Method::GET,
};
let has_range = options.range.is_some();
let builder = self.client.request(method, url);

let res = builder
.with_get_options(options)
.send_retry(&self.retry_config)
.await
.map_err(|source| match source.status() {
// Some stores return METHOD_NOT_ALLOWED for get on directories
Some(StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED) => {
crate::Error::NotFound {
source: Box::new(source),
path: location.to_string(),
}
}
_ => Error::Request { source }.into(),
})?;

// We expect a 206 Partial Content response if a range was requested
// a 200 OK response would indicate the server did not fulfill the request
if has_range && res.status() != StatusCode::PARTIAL_CONTENT {
return Err(crate::Error::NotSupported {
source: Box::new(Error::RangeNotSupported {
href: location.to_string(),
}),
});
}

Ok(res)
}
}

/// The response returned by a PROPFIND request, i.e. list
#[derive(Deserialize, Default)]
pub struct MultiStatus {
Expand Down
47 changes: 6 additions & 41 deletions object_store/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,18 @@
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use futures::StreamExt;
use itertools::Itertools;
use snafu::{OptionExt, ResultExt, Snafu};
use tokio::io::AsyncWrite;
use url::Url;

use crate::client::header::{header_meta, HeaderConfig};
use crate::client::get::GetClientExt;
use crate::http::client::Client;
use crate::path::Path;
use crate::{
ClientConfigKey, ClientOptions, GetOptions, GetResult, GetResultPayload, ListResult,
MultipartId, ObjectMeta, ObjectStore, Result, RetryConfig,
ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartId,
ObjectMeta, ObjectStore, Result, RetryConfig,
};

mod client;
Expand Down Expand Up @@ -115,46 +115,11 @@ impl ObjectStore for HttpStore {
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let range = options.range.clone();
let response = self.client.get(location, options).await?;
let cfg = HeaderConfig {
last_modified_required: false,
etag_required: false,
};
let meta =
header_meta(location, response.headers(), cfg).context(MetadataSnafu)?;

let stream = response
.bytes_stream()
.map_err(|source| Error::Reqwest { source }.into())
.boxed();

Ok(GetResult {
payload: GetResultPayload::Stream(stream),
range: range.unwrap_or(0..meta.size),
meta,
})
self.client.get_opts(location, options).await
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let status = self.client.list(Some(location), "0").await?;
match status.response.len() {
1 => {
let response = status.response.into_iter().next().unwrap();
response.check_ok()?;
match response.is_dir() {
true => Err(crate::Error::NotFound {
path: location.to_string(),
source: "Is directory".to_string().into(),
}),
false => response.object_meta(self.client.base_url()),
}
}
x => Err(crate::Error::NotFound {
path: location.to_string(),
source: format!("Expected 1 result, got {x}").into(),
}),
}
self.client.head(location).await
}

async fn delete(&self, location: &Path) -> Result<()> {
Expand Down