ModelPackageFileCacheImpl.java

/**
 * Copyright (c) 2018 European Organisation for Nuclear Research (CERN), All Rights Reserved.
 */

package org.jmad.modelpack.cache.impl;

import static cern.accsoft.steering.jmad.modeldefs.io.impl.ModelDefinitionUtil.ZIP_FILE_EXTENSION;
import static cern.accsoft.steering.jmad.modeldefs.io.impl.ModelDefinitionUtil.isZipFileName;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toSet;

import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import cern.accsoft.steering.jmad.util.StreamUtil;
import cern.accsoft.steering.jmad.util.TempFileUtil;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.jmad.modelpack.cache.ModelPackageFileCache;
import org.jmad.modelpack.domain.ModelPackageVariant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class ModelPackageFileCacheImpl implements ModelPackageFileCache {

    private static final Logger LOGGER = LoggerFactory.getLogger(ModelPackageFileCacheImpl.class);
    private static final String CACHE_SUBDIR = "package-cache";

    private final File cacheDir;
    private final Gson gson = new GsonBuilder().setPrettyPrinting().create();

    /**
     * contains all the files which were are used. The purpose here is to return the same file instances each time, so
     * that we can lock on them for checking if they exist or when writing to them. This way, at least we should be able
     * to avoid concurrency issues within the same instance of this cache.
     * <p>
     * ... Concurrency issues within different processes is another story and is not yet addressed.
     */
    private final Map<ModelPackageVariant, File> packageFiles = new HashMap<>();

    public ModelPackageFileCacheImpl(TempFileUtil tempFileUtil) {
        requireNonNull(tempFileUtil, "tempFileUtil must not be null");
        this.cacheDir = tempFileUtil.getOutputDir(CACHE_SUBDIR);
    }

    @Override
    public Mono<File> fileFor(ModelPackageVariant packageVariant,
            Function<ModelPackageVariant, Mono<Resource>> zipFileResourceCallback) {
        File packageFile = packageFile(packageVariant);
        synchronized (packageFile) {
            if (packageFile.exists()) {
                return Mono.just(packageFile);
            }

            // @formatter:off
            return Mono.just(packageVariant)
                .publishOn(Schedulers.elastic())
                .doOnNext(v -> LOGGER.info("Downloading model package {} to temp file {}.", v, packageFile))
                .flatMap(zipFileResourceCallback)
                .map(r -> {
                        synchronized (packageFile) {
                            return downloadFile(packageVariant, r, packageFile);
                        }
                    });
            // @formatter:on
        }
    }

    @Override
    public Flux<ModelPackageVariant> cachedPackageVariants() {
        // @formatter:off
        return Flux.fromIterable(existingJsonFiles())
                    .map(this::readMetaInfoFrom)
                    .filter(Optional::isPresent)
                    .map(Optional::get);
        // @formatter:on
    }

    private File downloadFile(ModelPackageVariant packageVariant, Resource zipResource, File file) {
        try {
            LOGGER.info("Storing model package {} to temp file {}.", packageVariant, file.getAbsoluteFile());
            StreamUtil.toFile(zipResource.getInputStream(), file);
            LOGGER.info("Successfully stored model package to file {}.", file.getAbsoluteFile());
            writeMetaInfo(packageVariant);
            return file;
        } catch (IOException e) {
            throw new IllegalStateException("Unable to download package file for package '" + packageVariant + "'");
        }
    }

    private File packageFile(ModelPackageVariant packageVariant) {
        synchronized (packageFiles) {
            File file = packageFiles.get(packageVariant);
            if (file != null) {
                return file;
            }
            file = zipFileFor(packageVariant);
            packageFiles.put(packageVariant, file);
            return file;
        }
    }

    private File zipFileFor(ModelPackageVariant packageVariant) {
        return new File(cacheDir, zipFileName(packageVariant));
    }

    private static String zipFileName(ModelPackageVariant packageVariant) {
        return packageVariant.fullName() + ZIP_FILE_EXTENSION;
    }

    private File jsonFileFor(File zipFile) {
        return new File(zipFile.getAbsolutePath() + ".json");
    }

    @Override
    public Mono<Void> clear() {
        return Mono.fromRunnable(() -> {
            synchronized (packageFiles) {
                Set<ModelPackageVariant> deletedKeys = new HashSet<>();
                packageFiles.forEach((key, file) -> {
                    if (deleteCacheEntry(file)) {
                        deletedKeys.add(key);
                    }
                });
                deletedKeys.forEach(packageFiles::remove);

                /* Also try to remove the rest of the files, even if they were not in the map */
                cachedZipFiles().forEach(this::deleteCacheEntry);

                LOGGER.info("Caches cleared !");
            }
        });
    }

    private Set<File> cachedZipFiles() {
        // @formatter:off
        return Arrays.stream(cacheDir.listFiles())
                .filter(f -> isZipFileName(f.getName()))
                .collect(toSet());
        // @formatter:on
    }

    private boolean deleteCacheEntry(File zipFile) {
        deleteFile(jsonFileFor(zipFile));
        return deleteFile(zipFile);
    }

    private Set<File> existingJsonFiles() {
        // @formatter:off
       return cachedZipFiles().stream()
               .map(this::jsonFileFor)
               .filter(File::exists)
               .collect(Collectors.toSet());
        // @formatter:on
    }

    private boolean deleteFile(File file) {
        try {
            Files.delete(file.toPath());
            LOGGER.info("Deleted file {}.", file);
            return true;
        } catch (IOException e) {
            LOGGER.warn("File {} could not be deleted.", file, e);
            return false;
        }
    }

    private void writeMetaInfo(ModelPackageVariant packageVariant) {
        File file = jsonFileFor(zipFileFor(packageVariant));

        try (Writer writer = new FileWriter(file)) {
            gson.toJson(packageVariant, writer);
            LOGGER.info("Successfully stored meta info for packageVariant {} in file {}.", packageVariant, file);
        } catch (IOException e) {
            LOGGER.error("Meta info for packageVariant {} could not be written to file {}.", packageVariant, file, e);
        }
    }

    private Optional<ModelPackageVariant> readMetaInfoFrom(File jsonFile) {
        try (Reader writer = new FileReader(jsonFile)) {
            ModelPackageVariant packageVariant = gson.fromJson(writer, ModelPackageVariant.class);
            LOGGER.info("Successfully read meta info for packageVariant {} from file {}.", packageVariant, jsonFile);
            return Optional.of(packageVariant);
        } catch (IOException e) {
            LOGGER.error("Meta info could not be read from file {}.", jsonFile, e);
            return Optional.empty();
        }
    }

}