Files
Robust_image_source_identif…/datasets/raise/attribute_source_camera.py
2024-05-15 20:15:20 +02:00

219 lines
14 KiB
Python
Executable File

#!/usr/bin/env python
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from utils import denoise, iterativeMean, getColorChannel, escapeFilePath, Color, mergeSingleColorChannelImagesAccordingToBayerFilter, rescaleRawImageForDenoiser, updateExtremes, saveNpArray, getColorMeans, getImageCrop, Denoiser, Distance
import os
import random
# Configuration:
DENOISER = Denoiser.MEAN
IMAGES_CAMERAS_FOLDER = {
'RAISE': 'flat-field/nef',
'Rafael 23/04/24': 'rafael/230424',
}
TRAINING_PORTION = 0.5
PREDICT_ONLY_ON_WHOLE_TRAINING_SET = False
DISTANCE = Distance.ROOT_MEAN_SQUARE
# See [Robust_image_source_identification_on_modern_smartphones/issues/22](https://gitea.lemnoslife.com/Benjamin_Loison/Robust_image_source_identification_on_modern_smartphones/issues/22).
match DISTANCE:
case Distance.ROOT_MEAN_SQUARE:
import sys
sys.path.insert(0, '../../algorithms/distance/')
from rms_diff import rmsDiffNumpy
case PEARSON_CORRELATION:
import scipy
from utils import getPearsonCorrelationDistance
setting = ','.join([escapeFilePath(imageCameraFolder) for imageCameraFolder in IMAGES_CAMERAS_FOLDER]) + f'_{DENOISER}'
imagesCamerasFileNames = {camera: [imageCameraFile for imageCameraFile in os.listdir(imageCameraFolder) if imageCameraFile.endswith('.NEF') or imageCameraFile.endswith('.ARW')] for camera, imageCameraFolder in IMAGES_CAMERAS_FOLDER.items()}
# Fix randomness for reproducibility.
random.seed(0)
# Randomize order to not have a bias (chronological for instance) when split to make training and testing sets.
for camera in IMAGES_CAMERAS_FOLDER:
random.shuffle(imagesCamerasFileNames[camera])
# Limit number of images per camera with the one having the less images.
minimumNumberOfImagesCameras = min([len(imagesCamerasFileNames[camera]) for camera in IMAGES_CAMERAS_FOLDER])
for camera in IMAGES_CAMERAS_FOLDER:
imagesCamerasFileNames[camera] = imagesCamerasFileNames[camera][:minimumNumberOfImagesCameras]
print(camera, imagesCamerasFileNames[camera])
numberOfCameras = len(IMAGES_CAMERAS_FOLDER)
camerasIterativeMean = {camera: iterativeMean() for camera in IMAGES_CAMERAS_FOLDER}
# Compute the minimal color channel camera resolution.
# Assume that for each camera, its images have the same resolution.
# The following consider a given color channel resolution, assuming they all have the same resolution.
minimalColorChannelCameraResolution = (100, 100)#None
if minimalColorChannelCameraResolution is None:
for camera in IMAGES_CAMERAS_FOLDER:
imageFileName = imagesCamerasFileNames[camera][0]
imageFilePath = f'{IMAGES_CAMERAS_FOLDER[camera]}/{imageFileName}'
singleColorChannelImagesShape = getColorChannel(imageFilePath, Color.RED).shape
if minimalColorChannelCameraResolution is None or singleColorChannelImagesShape < minimalColorChannelCameraResolution:
minimalColorChannelCameraResolution = singleColorChannelImagesShape
minColor = 0#None
maxColor = 7952#None
accuracy = []
numberOfTrainingImages = int(minimumNumberOfImagesCameras * TRAINING_PORTION)
numberOfTestingImages = minimumNumberOfImagesCameras - int(minimumNumberOfImagesCameras * TRAINING_PORTION)
cameraTestingImagesNoise = {}
def getImageFilePath(camera, cameraImageIndex):
imageFileName = imagesCamerasFileNames[camera][cameraImageIndex]
imageFilePath = f'{IMAGES_CAMERAS_FOLDER[camera]}/{imageFileName}'
return imageFilePath
def getSingleColorChannelImages(camera, cameraImageIndex):
imageFilePath = getImageFilePath(camera, cameraImageIndex)
singleColorChannelImages = {color: rescaleIfNeeded(getImageCrop(getColorChannel(imageFilePath, color), minimalColorChannelCameraResolution), minColor, maxColor) for color in Color}
return singleColorChannelImages
def getMultipleColorsImage(singleColorChannelImages):
multipleColorsImage = mergeSingleColorChannelImagesAccordingToBayerFilter(singleColorChannelImages)
return multipleColorsImage
def getImagePrnuEstimateNpArray(singleColorChannelImages, multipleColorsImage, camera):
singleColorChannelDenoisedImages = {}
for color in Color:
if DENOISER != Denoiser.MEAN:
singleColorChannelDenoisedImage = denoise(singleColorChannelImages[color], DENOISER)
else:
cameraColorMean = cameraColorMeans[camera][color]
if PREDICT_ONLY_ON_WHOLE_TRAINING_SET:
singleColorChannelDenoisedImage = cameraColorMean
else:
cameraColorCurrentMean = cameraColorMean.mean
singleColorChannelDenoisedImage = cameraColorCurrentMean
singleColorChannelDenoisedImages[color] = singleColorChannelDenoisedImage
multipleColorsDenoisedImage = mergeSingleColorChannelImagesAccordingToBayerFilter(singleColorChannelDenoisedImages)
imagePrnuEstimateNpArray = multipleColorsImage - multipleColorsDenoisedImage
return imagePrnuEstimateNpArray
imagesCamerasFilePaths = {camera: [f'{IMAGES_CAMERAS_FOLDER[camera]}/{imagesCamerasFileName}' for imagesCamerasFileName in imagesCamerasFileNames[camera]] for camera in imagesCamerasFileNames}
# If the denoiser is `MEAN`:
# If `PREDICT_ONLY_ON_WHOLE_TRAINING_SET`, then compute the means of camera images to empower the `MEAN` denoiser.
# Otherwise initialize these means of camera images to `iterativeMean`.
if DENOISER == Denoiser.MEAN:
cameraColorMeans = {camera: (getColorMeans(imagesCamerasFilePaths[camera][:numberOfTrainingImages], Color, minimalColorChannelCameraResolution) if PREDICT_ONLY_ON_WHOLE_TRAINING_SET else {color: iterativeMean() for color in Color}) for camera in imagesCamerasFilePaths}
from utils import silentTqdm
#tqdm = silentTqdm
returnSingleColorChannelImage = lambda singleColorChannelImage, _minColor, _maxColor: singleColorChannelImage
# Assume to have `{min,max}Color` hardcoded.
# Can just load to memory `getSingleColorChannelImages`, see [Robust_image_source_identification_on_modern_smartphones/issues/62#issuecomment-1861](https://gitea.lemnoslife.com/Benjamin_Loison/Robust_image_source_identification_on_modern_smartphones/issues/62#issuecomment-1861).
'''
rescaleIfNeeded = rescaleRawImageForDenoiser
cameraTrainingImages = {}
for cameraTrainingImageIndex in tqdm(range(numberOfTrainingImages), 'Load to memory camera training image'):
for cameraIndex, camera in enumerate(IMAGES_CAMERAS_FOLDER):
singleColorChannelImages = getSingleColorChannelImages(camera, cameraTrainingImageIndex)
multipleColorsImage = getMultipleColorsImage(singleColorChannelImages)
cameraTrainingImages[camera] = cameraTrainingImages.get(camera, []) + [multipleColorsImage]
singleColorChannelTestingImages = {camera: [] for camera in IMAGES_CAMERAS_FOLDER}
for camera in IMAGES_CAMERAS_FOLDER:
for cameraTestingImageIndex in tqdm(range(numberOfTestingImages), 'Load to memory camera testing image'):
singleColorChannelImages = getSingleColorChannelImages(camera, numberOfTrainingImages + cameraTestingImageIndex)
singleColorChannelTestingImages[camera] += [singleColorChannelImages]
'''
# 2 loops:
# - the first one is about computing `{min,max}Color`
# - the second one is about estimating better and better the PRNU of each camera, as consider more and more training images and measuring the resulting attribution of cameras
for computeExtremes in tqdm(([True] if minColor is None or maxColor is None else []) + [False], 'Compute extremes'):
rescaleIfNeeded = returnSingleColorChannelImage if computeExtremes else rescaleRawImageForDenoiser
# As the second loop firstly if the denoiser is not `MEAN` or if `MEAN` predicts only on the whole training set, then compute noises of testing images.
if not computeExtremes:
print(f'{minColor=} {maxColor=}')
if DENOISER != Denoiser.MEAN or PREDICT_ONLY_ON_WHOLE_TRAINING_SET:
print('Extracting noise of testing images')
for camera in tqdm(IMAGES_CAMERAS_FOLDER, 'Camera'):
for cameraTestingImageIndex in tqdm(range(numberOfTestingImages), 'Camera testing image index'):
print(f'{camera=} {numberOfTrainingImages + cameraTestingImageIndex=}')
#singleColorChannelImages = singleColorChannelTestingImages[camera][cameraTestingImageIndex]
singleColorChannelImages = getSingleColorChannelImages(camera, numberOfTrainingImages + cameraTestingImageIndex)
multipleColorsImage = getMultipleColorsImage(singleColorChannelImages)
imagePrnuEstimateNpArray = getImagePrnuEstimateNpArray(singleColorChannelImages, multipleColorsImage, camera)
cameraTestingImagesNoise[camera] = cameraTestingImagesNoise.get(camera, []) + [imagePrnuEstimateNpArray]
# As the first loop compute `{min,max}Color` based on testing images.
# As the second loop improve PRNU estimation by considering an additional testing image and if needed measure current learning step accuracy.
for cameraTrainingImageIndex in tqdm(range(minimumNumberOfImagesCameras if computeExtremes else numberOfTrainingImages), 'Camera training image index'):
for cameraIndex, camera in enumerate(tqdm(IMAGES_CAMERAS_FOLDER, 'Camera')):
singleColorChannelImages = getSingleColorChannelImages(camera, cameraTrainingImageIndex)
multipleColorsImage = getMultipleColorsImage(singleColorChannelImages)
if computeExtremes:
minColor, maxColor = updateExtremes(multipleColorsImage, minColor, maxColor)
continue
if DENOISER == Denoiser.MEAN and not PREDICT_ONLY_ON_WHOLE_TRAINING_SET:
for color in Color:
cameraColorMeans[camera][color].add(singleColorChannelImages[color])
imagePrnuEstimateNpArray = getImagePrnuEstimateNpArray(singleColorChannelImages, multipleColorsImage, camera)
cameraIterativeMean = camerasIterativeMean[camera]
if DENOISER != Denoiser.MEAN or PREDICT_ONLY_ON_WHOLE_TRAINING_SET:
cameraIterativeMean.add(imagePrnuEstimateNpArray)
else:
# Still use `cameraIterativeMean` to simplify the implementation.
cameraIterativeMean.mean = np.mean([cameraTrainingImage - mergeSingleColorChannelImagesAccordingToBayerFilter({color: cameraColorMeans[camera][color].mean for color in Color}) for cameraTrainingImage in cameraTrainingImages[camera][:cameraTrainingImageIndex + 1]], axis = 0)
# If we are considering the last camera and (not `PREDICT_ONLY_ON_WHOLE_TRAINING_SET` or we are considering the last training image), then we proceeded an additional image for all cameras and we can predict the accuracy at this learning step.
if cameraIndex == numberOfCameras - 1 and (not PREDICT_ONLY_ON_WHOLE_TRAINING_SET or cameraTrainingImageIndex == numberOfTrainingImages - 1):
numberOfTrainingImagesAccuracy = 0
print(f'{numberOfTestingImages=} {numberOfCameras=}')
# Loop over each camera testing image folder.
for actualCamera in IMAGES_CAMERAS_FOLDER:
for cameraTestingImageIndex in tqdm(range(numberOfTestingImages), 'Camera testing image index'):
cameraPredicted = None
minimalDistance = None
#plt.imsave(f'{escapeFilePath(actualCamera)}_{cameraTestingImageIndex}.png', cameraTestingImagesNoise[actualCamera][cameraTestingImageIndex])
# Loop over each camera to compute closeness between the considered testing image noise and the estimated PRNUs of the various cameras.
for camera in IMAGES_CAMERAS_FOLDER:
if DENOISER != Denoiser.MEAN:
cameraTestingImageNoise = cameraTestingImagesNoise[actualCamera][cameraTestingImageIndex]
else:
#singleColorChannelImages = singleColorChannelTestingImages[camera][cameraTestingImageIndex]
singleColorChannelImages = getSingleColorChannelImages(camera, numberOfTrainingImages + cameraTestingImageIndex)
multipleColorsImage = getMultipleColorsImage(singleColorChannelImages)
cameraTestingImageNoise = getImagePrnuEstimateNpArray(singleColorChannelImages, multipleColorsImage, camera)
match DISTANCE:
case ROOT_MEAN_SQUARE:
distance = rmsDiffNumpy(cameraTestingImageNoise, camerasIterativeMean[camera].mean)
case PEARSON_CORRELATION:
distance = getPearsonCorrelationDistance(cameraTestingImagesNoise[actualCamera][cameraTestingImageIndex], camerasIterativeMean[camera].mean)
print(f'{cameraTrainingImageIndex=} {cameraTestingImageIndex=} {camera=} {actualCamera=} {distance=}')
if minimalDistance is None or distance < minimalDistance:
minimalDistance = distance
cameraPredicted = camera
print(f'Predicted camera {cameraPredicted} {"good" if cameraPredicted == actualCamera else "bad"}')
if cameraPredicted == actualCamera:
numberOfTrainingImagesAccuracy += 1
accuracy += [numberOfTrainingImagesAccuracy / (numberOfTestingImages * numberOfCameras)]
# Save the estimated PRNU of each camera after having trained on the whole training set.
for camera in IMAGES_CAMERAS_FOLDER:
plt.imsave(f'{setting}_estimated_prnu_camera_{escapeFilePath(camera)}.png', (camerasIterativeMean[camera].mean))
# Plot and save the accuracy of camera source attribution thanks to a given number of images to estimate PRNUs with `DENOISER` denoiser.
plt.title(f'Accuracy of camera source attribution thanks to a given number of images to estimate PRNUs with {DENOISER} denoiser')
plt.xlabel('Number of images to estimate PRNU')
plt.ylabel('Accuracy of camera source attribution')
plt.plot(accuracy)
internalTitle = f'{setting}_accuracy_of_camera_source_attribution'
saveNpArray(internalTitle, accuracy)
plt.savefig(f'{internalTitle}.svg')