#!/usr/bin/env python3
import json
import random
import warnings
from datetime import datetime
from pathlib import Path
import numpy
from PIL import Image, ImageEnhance
from draugr.visualisation import progress_bar
from neodroidvision.data.synthesis.conversion.mnist.json_generation.mask_json_utilities import (
MaskJsonUtils,
)
__all__ = ["ImageComposition"]
[docs]class ImageComposition:
"""Composes images together in random ways, applying transformations to the foreground to create a
synthetic
combined image."""
verbose = False
[docs] def __init__(self):
self.allowed_output_types = [".png", ".jpg", ".jpeg"]
self.allowed_background_types = [".png", ".jpg", ".jpeg"]
self.zero_padding = 8 # 00000027.png, supports up to 100 million images
self.max_foregrounds = 3
self.mask_colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255)]
assert (
len(self.mask_colors) >= self.max_foregrounds
), "length of mask_colors should be >= max_foregrounds"
def _validate_and_process_args(self, args):
# Validates input arguments and sets up class variables
# Args:
# args: the ArgumentParser command line arguments
self.silent = args.silent
# Validate the count
assert args.count > 0, "count must be greater than 0"
self.count = args.count
# Validate the width and height
assert args._width >= 64, "width must be greater than 64"
self.width = args._width
assert args._height >= 64, "height must be greater than 64"
self.height = args._height
# Validate and process the output type
if args.output_type is None:
self.output_type = ".jpg" # default
else:
if args.output_type[0] != ".":
self.output_type = f".{args.output_type}"
assert self.output_type in self.allowed_output_types, (
f"output_type is not supported: " f"{self.output_type}"
)
# Validate and process output and input directories
self._validate_and_process_output_directory()
self._validate_and_process_input_directory()
def _validate_and_process_output_directory(self):
self.output_dir = Path(config.output_dir)
self.images_output_dir = self.output_dir / "images"
self.masks_output_dir = self.output_dir / "masks"
# Create directories
self.output_dir.mkdir(exist_ok=True)
self.images_output_dir.mkdir(exist_ok=True)
self.masks_output_dir.mkdir(exist_ok=True)
if not self.silent:
# Check for existing contents in the images directory
for _ in self.images_output_dir.iterdir():
# We found something, check if the user wants to overwrite files or quit
should_continue = input(
"output_dir is not empty, files may be overwritten.\nContinue (y/n)? "
).lower()
if should_continue != "y" and should_continue != "yes":
quit()
break
def _validate_and_process_input_directory(self):
self.input_dir = Path(config.input_dir)
assert self.input_dir.exists(), f"input_dir does not exist: {config.input_dir}"
for x in self.input_dir.iterdir():
if x.name == "foregrounds":
self.foregrounds_dir = x
elif x.name == "backgrounds":
self.backgrounds_dir = x
assert (
self.foregrounds_dir is not None
), "foregrounds subdirectory was not found in the input_dir"
assert (
self.backgrounds_dir is not None
), "backgrounds subdirectory was not found in the input_dir"
self._validate_and_process_foregrounds()
self._validate_and_process_backgrounds()
def _validate_and_process_foregrounds(self):
# Validates input foregrounds and processes them into a foregrounds dictionary.
# Expected directory structure:
# + foregrounds_dir
# + super_category_dir
# + category_dir
# + foreground_image.png
self.foregrounds_dict = dict()
for super_category_dir in self.foregrounds_dir.iterdir():
if not super_category_dir.is_dir():
warnings.warn(
f"file found in foregrounds directory (expected super-category directories), ignoring: "
f"{super_category_dir}"
)
continue
# This is a super category directory
for category_dir in super_category_dir.iterdir():
if not category_dir.is_dir():
warnings.warn(
f"file found in super category directory (expected category directories), ignoring: "
f"{category_dir}"
)
continue
# This is a category directory
for image_file in category_dir.iterdir():
if not image_file.is_file():
warnings.warn(
f"a directory was found inside a category directory, ignoring: {str(image_file)}"
)
continue
if image_file.suffix != ".png":
warnings.warn(
f"foreground must be a .png file, skipping: {str(image_file)}"
)
continue
# Valid foreground image, add to foregrounds_dict
super_category = super_category_dir.name
category = category_dir.name
if super_category not in self.foregrounds_dict:
self.foregrounds_dict[super_category] = dict()
if category not in self.foregrounds_dict[super_category]:
self.foregrounds_dict[super_category][category] = []
self.foregrounds_dict[super_category][category].append(image_file)
assert len(self.foregrounds_dict) > 0, "no valid foregrounds were found"
def _validate_and_process_backgrounds(self):
self.backgrounds = []
for image_file in self.backgrounds_dir.iterdir():
if not image_file.is_file():
warnings.warn(
f"a directory was found inside the backgrounds directory, ignoring: {image_file}"
)
continue
if image_file.suffix not in self.allowed_background_types:
warnings.warn(
f"background must match an accepted type {str(self.allowed_background_types)}, ignoring: "
f"{image_file}"
)
continue
# Valid file, add to background list
self.backgrounds.append(image_file)
assert len(self.backgrounds) > 0, "no valid backgrounds were found"
def _generate_images(self):
# Generates a number of images and creates segmentation masks, then
# saves a mask_definitions.json file that describes the dataset.
if ImageComposition.verbose:
print(f"Generating {self.count} images with masks...")
mju = MaskJsonUtils(self.output_dir)
# Create all images/masks (with tqdm to have a progress bar)
for i in progress_bar(range(self.count)):
# Randomly choose a background
background_path = random.choice(self.backgrounds)
num_foregrounds = random.randint(1, self.max_foregrounds)
foregrounds = []
for fg_i in range(num_foregrounds):
# Randomly choose a foreground
super_category = random.choice(list(self.foregrounds_dict.keys()))
category = random.choice(
list(self.foregrounds_dict[super_category].keys())
)
foreground_path = random.choice(
self.foregrounds_dict[super_category][category]
)
# Get the color
mask_rgb_color = self.mask_colors[fg_i]
foregrounds.append(
{
"super_category": super_category,
"category": category,
"foreground_path": foreground_path,
"mask_rgb_color": mask_rgb_color,
}
)
# Compose foregrounds and background
composite, mask = self._compose_images(foregrounds, background_path)
# Create the file name (used for both composite and mask)
save_filename = f"{i:0{self.zero_padding}}" # e.g. 00000023.jpg
# Save composite image to the images sub-directory
composite_filename = (
f"{save_filename}{self.output_type}" # e.g. 00000023.jpg
)
composite_path = self.output_dir / "images" / composite_filename # e.g.
# my_output_dir/images/00000023.jpg
composite = composite.convert("RGB") # remove alpha
composite.save(composite_path)
# Save the mask image to the masks sub-directory
mask_filename = f"{save_filename}.png" # masks are always png to avoid lossy compression
mask_path = (
self.output_dir / "masks" / mask_filename
) # e.g. my_output_dir/masks/00000023.png
mask.save(mask_path)
color_categories = dict()
for fg in foregrounds:
# Add category and color info
mju.add_category(fg["category"], fg["super_category"])
color_categories[str(fg["mask_rgb_color"])] = {
"category": fg["category"],
"super_category": fg["super_category"],
}
# Add the mask to MaskJsonUtils
mju.add_mask(
composite_path.relative_to(self.output_dir).as_posix(),
mask_path.relative_to(self.output_dir).as_posix(),
color_categories,
)
# Write masks to json
mju.write_masks_to_json()
def _compose_images(self, foregrounds, background_path):
# Composes a foreground image and a background image and creates a segmentation mask
# using the specified color. Validation should already be done by now.
# Args:
# foregrounds: a list of dicts with format:
# [{
# 'super_category':super_category,
# 'category':category,
# 'foreground_path':foreground_path,
# 'mask_rgb_color':mask_rgb_color
# },...]
# background_path: the path to a valid background image
# Returns:
# composite: the composed image
# mask: the mask image
# Open background and convert to RGBA
background = Image.open(background_path)
background = background.convert("RGBA")
# Crop background to desired size (self.width x self.height), randomly positioned
bg_width, bg_height = background.size
max_crop_x_pos = bg_width - self.width
max_crop_y_pos = bg_height - self.height
assert max_crop_x_pos >= 0, (
f"desired width, {self.width}, is greater than background width, "
f"{bg_width}, for {str(background_path)}"
)
assert max_crop_y_pos >= 0, (
f"desired height, {self.height}, is greater than backgrou"
f"nd height, {bg_height}, for {str(background_path)}"
)
crop_x_pos = random.randint(0, max_crop_x_pos)
crop_y_pos = random.randint(0, max_crop_y_pos)
composite = background.crop(
(crop_x_pos, crop_y_pos, crop_x_pos + self.width, crop_y_pos + self.height)
)
composite_mask = Image.new("RGB", composite.size, 0)
for fg in foregrounds:
fg_path = fg["foreground_path"]
# Perform transformations
fg_image = self._transform_foreground(fg, fg_path)
# Choose a random x,y position for the foreground
max_x_position = composite.size[0] - fg_image.size[0]
max_y_position = composite.size[1] - fg_image.size[1]
assert max_x_position >= 0 and max_y_position >= 0, (
f"foreground {fg_path} is too big ({fg_image.size[0]}x{fg_image.size[1]}) for the requested"
f"output size ({self.width}x{self.height}), check your input parameters"
)
paste_position = (
random.randint(0, max_x_position),
random.randint(0, max_y_position),
)
# Create a new foreground image as large as the composite and paste it on top
new_fg_image = Image.new("RGBA", composite.size, color=(0, 0, 0, 0))
new_fg_image.paste(fg_image, paste_position)
# Extract the alpha channel from the foreground and paste it into a new image the size of the composite
alpha_mask = fg_image.getchannel(3)
new_alpha_mask = Image.new("L", composite.size, color=0)
new_alpha_mask.paste(alpha_mask, paste_position)
composite = Image.composite(new_fg_image, composite, new_alpha_mask)
# Grab the alpha pixels above a specified threshold
alpha_threshold = 200
mask_arr = numpy.array(
numpy.greater(numpy.array(new_alpha_mask), alpha_threshold),
dtype=numpy.uint8,
)
uint8_mask = numpy.uint8(mask_arr) # This is composed of 1s and 0s
# Multiply the mask value (1 or 0) by the color in each RGB channel and combine to get the mask
mask_rgb_color = fg["mask_rgb_color"]
red_channel = uint8_mask * mask_rgb_color[0]
green_channel = uint8_mask * mask_rgb_color[1]
blue_channel = uint8_mask * mask_rgb_color[2]
rgb_mask_arr = numpy.dstack((red_channel, green_channel, blue_channel))
isolated_mask = Image.fromarray(rgb_mask_arr, "RGB")
isolated_alpha = Image.fromarray(uint8_mask * 255, "L")
composite_mask = Image.composite(
isolated_mask, composite_mask, isolated_alpha
)
return composite, composite_mask
def _transform_foreground(self, fg, fg_path):
# Open foreground and get the alpha channel
fg_image = Image.open(fg_path)
fg_alpha = numpy.array(fg_image.getchannel(3))
assert numpy.any(
fg_alpha == 0
), f"foreground needs to have some transparency: {str(fg_path)}"
# ** Apply Transformations **
# Rotate the foreground
angle_degrees = random.randint(0, 359)
fg_image = fg_image.rotate(angle_degrees, resample=Image.BICUBIC, expand=True)
# Scale the foreground
scale = random.random() * 0.5 + 0.5 # Pick something between .5 and 1
new_size = (int(fg_image.size[0] * scale), int(fg_image.size[1] * scale))
fg_image = fg_image.resize(new_size, resample=Image.BICUBIC)
# Adjust foreground brightness
brightness_factor = (
random.random() * 0.4 + 0.7
) # Pick something between .7 and 1.1
enhancer = ImageEnhance.Brightness(fg_image)
fg_image = enhancer.enhance(brightness_factor)
# Add any other transformations here...
return fg_image
def _create_info(self):
# A convenience wizard for automatically creating dataset info
# The user can always modify the resulting .json manually if needed
if self.silent:
# No user wizard in silent mode
return
should_continue = input(
"Would you like to create dataset info json? (y/n) "
).lower()
if should_continue != "y" and should_continue != "yes":
print("No problem. You can always create the json manually.")
quit()
if ImageComposition.verbose:
print(
"Note: you can always modify the json manually if you need to update this."
)
info = dict()
info["description"] = input("Description: ")
info["url"] = input("URL: ")
info["version"] = input("Version: ")
info["contributor"] = input("Contributor: ")
now = datetime.now()
info["year"] = now.year
info["date_created"] = f"{now.month:0{2}}/{now.day:0{2}}/{now.year}"
image_license = dict()
image_license["id"] = 0
should_add_license = input("Add an image license? (y/n) ").lower()
if should_add_license != "y" and should_add_license != "yes":
image_license["url"] = ""
image_license["name"] = "None"
else:
image_license["name"] = input("License name: ")
image_license["url"] = input("License URL: ")
dataset_info = dict()
dataset_info["info"] = info
dataset_info["license"] = image_license
# Write the JSON output file
output_file_path = Path(self.output_dir) / "dataset_info.json"
with open(output_file_path, "w+") as json_file:
json_file.write(json.dumps(dataset_info))
if ImageComposition.verbose:
print(f"Successfully created {output_file_path}")
def __call__(self, args):
self._validate_and_process_args(args)
self._generate_images()
self._create_info()
if ImageComposition.verbose:
print("Image composition completed.")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Image Composition")
parser.add_argument(
"--input_dir",
type=str,
dest="input_dir",
required=True,
help=(
"The input directory. This contains a 'backgrounds' directory of pngs or jpgs, and a 'foregrounds' "
"directory which contains super category directories (e.g. 'animal', 'vehicle'), each of which contain "
"category directories (e.g. 'horse', 'bear'). Each category directory contains png images of that item on "
"a transparent background (e.g. a grizzly bear on a transparent background)."
),
)
parser.add_argument(
"--output_dir",
type=str,
dest="output_dir",
required=True,
help="The directory where "
"images, masks, \
and json files will be placed",
)
parser.add_argument(
"--count",
type=int,
dest="count",
required=True,
help="number of composed images to create",
)
parser.add_argument(
"--width",
type=int,
dest="width",
required=True,
help="output image pixel width",
)
parser.add_argument(
"--height",
type=int,
dest="height",
required=True,
help="output image pixel height",
)
parser.add_argument(
"--output_type", type=str, dest="output_type", help="png or jpg (default)"
)
parser.add_argument(
"--silent",
action="store_true",
help="silent mode; doesn't prompt the user for input, automatically overwrites files",
)
config = parser.parse_args()
image_comp = ImageComposition()
image_comp(config)