Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Image folder processing and load balancing. #103

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
VERSION=7.1.0
VERSION=7.2.1

SHELL := /bin/bash

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "redact"
version = "7.1.0"
version = "7.2.1"
description = "Command-line and Python client for Brighter AI's Redact"
authors = ["brighter AI <[email protected]>"]
readme = "README.md"
Expand Down
2 changes: 1 addition & 1 deletion redact/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Python client for "brighter Redact"
"""

__version__ = "7.1.0"
__version__ = "7.2.1"

from .errors import RedactConnectError, RedactResponseError
from .v4.data_models import (
Expand Down
181 changes: 180 additions & 1 deletion redact/commons/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import glob
import logging
from pathlib import Path
from typing import List, Union
import shutil
from typing import List, Union, Optional
import tempfile
import tarfile
import fnmatch
import os
import re

from redact.settings import Settings

Expand Down Expand Up @@ -126,3 +132,176 @@ def parse_key_value_pairs(kv_pairs: List[str]) -> dict:
result[key] = value

return result


def is_folder_with_images(dir_path: Union[str, Path]):
"""Checks that the given path is showing a folder with at least one image."""
if os.path.isdir(dir_path):
if len(DirectoryImageFinder().find_images(dir_path)) > 0:
return True
return False


class DirectoryImageFinder:
def __init__(self):
self._image_re = re.compile(
"|".join(
[
fnmatch.translate("*.jpeg"),
fnmatch.translate("*.jpg"),
fnmatch.translate("*.png"),
]
)
)

def find_images(self, directory: Union[str, Path]):
images = []
for f in os.listdir(os.path.abspath(directory)):
if self.is_image(f):
images.append(f)
return images

def is_image(self, f):
return self._image_re.match(f.lower()) is not None


class ImageFolderVideoHandler(object):
def __init__(
self,
input_dir_path: Union[str, Path],
output_path: Union[str, Path],
file_batch_size: int,
):
self._input_file_names: List[str] = []
self._input_dir_path = input_dir_path
self._output_path = output_path
self._file_batch_size = file_batch_size
self._files_to_clean: List[Union[str, Path]] = []
self._directories_to_clean: List[Union[str, Path]] = []
self._batches: Optional[List[List[str]]] = None
self._current_batch = -1
self.input_tar = None
self.output_tar = None

def __enter__(self):
return self

def __exit__(self, type, value, traceback):
for f in self._files_to_clean:
if os.path.exists(f):
os.remove(f)
for d in self._directories_to_clean:
if os.path.exists(d):
shutil.rmtree(d)

def has_more(self) -> bool:
if self._batches is None:
self._prepare_batches()
return (self._current_batch + 1) < len(self._batches)

def _prepare_batches(self):
if not is_folder_with_images(self._input_dir_path):
raise ValueError(
"Provide a folder with images when using flag video_as_image_folders"
)
if self._input_dir_path == self._output_path:
raise ValueError(
"When processing video image folders, output path cannot be equal to input path."
)

files_in_dir = os.listdir(self._input_dir_path)
files_in_dir.sort()
image_finder = DirectoryImageFinder()
input_file_names: List[str] = []
for f in files_in_dir:
if image_finder.is_image(f):
input_file_names.append(f)

if self._file_batch_size <= 0:
self._batches = [input_file_names]
else:
self._batches = [
input_file_names[i : i + self._file_batch_size]
for i in range(0, len(input_file_names), self._file_batch_size)
]

def remove_input_tar(self):
if self.input_tar is not None and os.path.exists(self.input_tar):
os.remove(self.input_tar)
self.input_tar = None

def add_directory_to_clean(self, dir: Path):
"""Add a directory that is removed with all files on exit."""
self._directories_to_clean.append(dir)

def remove_directory_to_clean(self, dir: Path):
"""Remove a directory from the list of dirs to remove on exit."""
self._directories_to_clean.remove(dir)

def prepare_video_image_folder(self):
"""Create temp files and tar the images in the given directory."""
if self._batches is None or not self.has_more():
raise RuntimeError("Please call has_more before prepare_video_image_folder")

self._current_batch += 1
current_batch = self._batches[self._current_batch].copy()
logging.debug(
f"Preparing next batch: {self._current_batch + 1}/{len(self._batches)} of {len(current_batch)} files."
)

with tempfile.NamedTemporaryFile(
mode="w+b", dir=self._input_dir_path, delete=False, suffix=".tar"
) as temp_file:
self.input_tar = temp_file.name
self._files_to_clean.append(self.input_tar)

with tarfile.open(self.input_tar, "w:") as tar:
for f in current_batch:
full_path = os.path.join(self._input_dir_path, f)
tar.add(full_path, arcname=f)

self.output_tar = tempfile.mktemp(suffix=".tar", dir=self._output_path)
self._files_to_clean.append(self.output_tar)

def unpack_and_rename_output(self):
"""After processing, ensures the (batch's) files are in the output folder and correctly named."""
temp_folder = tempfile.mkdtemp(dir=self._output_path)
self.add_directory_to_clean(Path(temp_folder))

# open tarfile with the default security filter
with tarfile.open(self.output_tar, "r") as output_tarfile:
# TODO use filter="data" parameter when we drop 3.7 support
output_tarfile.extractall(temp_folder)
os.remove(self.output_tar)

self._check_and_rename_output_files(temp_folder)
os.rmdir(temp_folder)

def _check_and_rename_output_files(self, temp_folder: str):
input_images = self._batches[self._current_batch].copy()
input_images.sort()

image_finder = DirectoryImageFinder()
output_images = image_finder.find_images(temp_folder)
output_images.sort()

if len(output_images) != len(input_images):
raise RuntimeError(
"Count of images in input batch and images returned from service unequal!"
)

# check if the renaming is still happening, and if it is, then correct here
if not input_images == output_images:

i = 0
for output_image in output_images:
os.rename(
os.path.join(temp_folder, output_image),
os.path.join(temp_folder, input_images[i]),
)
i = i + 1

for image in input_images:
shutil.move(
os.path.join(temp_folder, image), os.path.join(self._output_path, image)
)
76 changes: 58 additions & 18 deletions redact/tools/v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
from redact.commons.utils import parse_key_value_pairs, setup_logging
from redact.settings import Settings
from redact.v4 import InputType, JobArguments, OutputType, Region, ServiceType
from redact.v4.tools.redact_file import redact_file as rdct_file
from redact.v4.tools.redact_file import (
redact_file as rdct_file,
redact_video_as_image_folder,
)
from redact.v4.tools.redact_folder import redact_folder as rdct_folder

settings = Settings()
Expand Down Expand Up @@ -137,6 +140,15 @@ def redact_file(
[],
help="Key-value pairs in the format key=value which will be added to allr equest header",
),
video_as_image_folders: bool = typer.Option(
False,
help="Enable processing of leaf directories with images "
"as videos with frames in alphabetic order.",
),
video_as_image_folders_batch_size: int = typer.Option(
1500,
help="Sets the size of the batches in images.",
),
):
setup_logging(verbose_logging)

Expand All @@ -157,20 +169,37 @@ def redact_file(
areas_of_interest=areas_of_interest,
)

rdct_file(
file_path=file_path,
output_type=output_type,
service=service,
job_args=job_args,
licence_plate_custom_stamp_path=licence_plate_custom_stamp_path,
redact_url=redact_url,
api_key=api_key,
output_path=output_path,
ignore_warnings=ignore_warnings,
skip_existing=skip_existing,
auto_delete_job=auto_delete_job,
custom_headers=parsed_header,
)
if video_as_image_folders:
redact_video_as_image_folder(
dir_path=file_path,
output_type=output_type,
service=service,
job_args=job_args,
licence_plate_custom_stamp_path=licence_plate_custom_stamp_path,
redact_url=redact_url,
api_key=api_key,
output_path=output_path,
ignore_warnings=ignore_warnings,
skip_existing=skip_existing,
auto_delete_job=auto_delete_job,
custom_headers=parsed_header,
file_batch_size=video_as_image_folders_batch_size,
)
else:
rdct_file(
file_path=file_path,
output_type=output_type,
service=service,
job_args=job_args,
licence_plate_custom_stamp_path=licence_plate_custom_stamp_path,
redact_url=redact_url,
api_key=api_key,
output_path=output_path,
ignore_warnings=ignore_warnings,
skip_existing=skip_existing,
auto_delete_job=auto_delete_job,
custom_headers=parsed_header,
)


@app.command()
Expand Down Expand Up @@ -254,9 +283,9 @@ def redact_folder(
help="A URL to call when the status of the Job changes",
show_default=False,
),
redact_url: str = typer.Option(
settings.redact_online_url,
help="Specify http address or ip of the redact instance",
redact_url: List[str] = typer.Option(
[settings.redact_online_url],
help="Specify http address or ip of the redact instance, or multiple for client-side load balancing",
),
api_key: Optional[str] = typer.Option(
None,
Expand Down Expand Up @@ -294,6 +323,15 @@ def redact_folder(
[],
help="Key-value pairs in the format key=value which will be added to allr equest header",
),
video_as_image_folders: bool = typer.Option(
False,
help="Enable processing of leaf directories with images "
"as videos with frames in alphabetic order.",
),
video_as_image_folders_batch_size: int = typer.Option(
1500,
help="Sets the size of the batches in images.",
),
):
setup_logging(verbose_logging)

Expand Down Expand Up @@ -330,4 +368,6 @@ def redact_folder(
auto_delete_job=auto_delete_job,
auto_delete_input_file=auto_delete_input_file,
custom_headers=parsed_header,
video_as_image_folders=video_as_image_folders,
video_as_image_folders_batch_size=video_as_image_folders_batch_size,
)
1 change: 1 addition & 0 deletions redact/v3/tools/redact_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ def _try_redact_file_with_relative_path(
except Exception as e:
log.debug(f"Unexpected exception: {e}", exc_info=e)
log.error(f"Error while anonymize {relative_file_path}: {str(e)}")
return None


def _redact_file_with_relative_path(
Expand Down
Loading
Loading