Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Remote analysis on cloud object-storage. #1037

Merged
merged 6 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ pycryptodome

# This is required for memory acquisition via leechcore/pcileech.
leechcorepyc>=2.4.0

# This is required for memory analysis on a Amazon/MinIO S3 and Google Cloud object storage
gcsfs>=2023.6.0
s3fs>=2023.6.0
57 changes: 57 additions & 0 deletions volatility3/framework/layers/cloudstorage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# This file is Copyright 2022 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

import logging
import urllib.parse
from typing import Optional, Any, List

try:
import s3fs
HAS_S3FS = True
except ImportError:
HAS_S3FS = False

try:
import gcsfs
HAS_GCSFS = True
except ImportError:
HAS_GCSFS = False

from volatility3.framework import exceptions

Check notice

Code scanning / CodeQL

Unused import Note

Import of 'exceptions' is not used.
from volatility3.framework.layers import resources

vollog = logging.getLogger(__file__)

class S3FileSystemHandler(resources.VolatilityHandler):
if HAS_S3FS:
@classmethod
def non_cached_schemes(cls) -> List[str]:
return ["s3"]

@staticmethod
def default_open(req: urllib.request.Request) -> Optional[Any]:
"""Handles the request if it's the s3 scheme."""
if req.type == "s3":
object_uri = "://".join(req.full_url.split("://")[1:])
return s3fs.S3FileSystem().open(object_uri)
return None
else:
raise exceptions.LayerException("s3 requirement is missing.")


class GSFileSystemHandler(resources.VolatilityHandler):
if HAS_GCSFS:
@classmethod
def non_cached_schemes(cls) -> List[str]:
return ["gs"]

@staticmethod
def default_open(req: urllib.request.Request) -> Optional[Any]:
"""Handles the request if it's the gs scheme."""
if req.type == "gs":
object_uri = "://".join(req.full_url.split("://")[1:])
return gcsfs.GCSFileSystem().open(object_uri)
return None
else:
raise exceptions.LayerException("gcsfs requirement is missing.")