MultiConnectorModelPackageService.java

/**
 * Copyright (c) 2018 European Organisation for Nuclear Research (CERN), All Rights Reserved.
 */

package org.jmad.modelpack.service.impl;

import static java.util.stream.Collectors.toList;
import static org.jmad.modelpack.service.JMadModelPackageService.Mode.ONLINE;
import static org.jmad.modelpack.util.ModelUris.findModelDefinitionFromUri;
import static org.jmad.modelpack.util.ModelUris.startupConfigurationFromUri;

import java.io.File;
import java.net.URI;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import javax.annotation.PostConstruct;

import cern.accsoft.steering.jmad.factory.JMadModelFactory;
import cern.accsoft.steering.jmad.model.JMadModel;
import cern.accsoft.steering.jmad.model.JMadModelStartupConfiguration;
import cern.accsoft.steering.jmad.modeldefs.domain.JMadModelDefinitionImpl;
import org.jmad.modelpack.cache.ModelPackageFileCache;
import org.jmad.modelpack.connect.DirectModelPackageConnector;
import org.jmad.modelpack.connect.ModelPackageConnector;
import org.jmad.modelpack.connect.ZipModelPackageConnector;
import org.jmad.modelpack.domain.JMadModelPackageRepository;
import org.jmad.modelpack.domain.ModelPackageVariant;
import org.jmad.modelpack.service.JMadModelPackageRepositoryProvider;
import org.jmad.modelpack.service.JMadModelPackageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.Resource;

import cern.accsoft.steering.jmad.modeldefs.domain.JMadModelDefinition;
import cern.accsoft.steering.jmad.service.JMadService;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class MultiConnectorModelPackageService implements JMadModelPackageService {

    private static final Logger LOGGER = LoggerFactory.getLogger(MultiConnectorModelPackageService.class);

    @Autowired
    private JMadModelPackageRepositoryProvider provider;
    @Autowired
    private List<ModelPackageConnector> connectors;
    @Autowired
    private JMadService jMadService;
    @Autowired
    private ModelPackageFileCache cache;

    private final AtomicReference<Mode> mode = new AtomicReference<>(ONLINE);

    @PostConstruct
    public void init() {
        LOGGER.info("Available model package connectors: {}.", connectors);
    }

    @Override
    public Flux<ModelPackageVariant> availablePackages() {
        if (ONLINE == mode.get()) {
            return provider.enabledRepositories().flatMap(this::packagesFrom);
        } else {
            return cache.cachedPackageVariants();
        }
    }

    private Function<ModelPackageVariant, Mono<Resource>> resourceCallback() {
        if (ONLINE == mode.get()) {
            return this::zipResourceFrom;
        } else {
            return this::errorOffline;
        }
    }

    @Override
    public Mono<JMadModel> createModelFromUri(URI uri) {
        return packageFromUri(uri).flatMapMany(this::modelDefinitionsFrom).collectList().map(modelPackList -> {
            JMadModelDefinition modelDefinition = findModelDefinitionFromUri(uri, modelPackList);
            JMadModelStartupConfiguration startupConfiguration = startupConfigurationFromUri(uri, modelDefinition);
            return jMadService.createModel(modelDefinition, startupConfiguration);
        });
    }

    @Override
    public Mono<ModelPackageVariant> packageFromUri(URI uri) {
        return connectors.stream().filter(c -> c.canHandle(uri)).findFirst()
                .orElseThrow(() -> new IllegalArgumentException("No connector can handle URI " + uri))
                .packageFromUri(uri);
    }

    @Override
    public Flux<JMadModelDefinition> modelDefinitionsFrom(ModelPackageVariant modelPackage) {
        return definitionsFromDirect(modelPackage).switchIfEmpty(definitionsFromFile(modelPackage))
                .doOnNext(md -> setModelPackUri(md, modelPackage));
    }

    private static void setModelPackUri(JMadModelDefinition md, ModelPackageVariant modelPackage) {
        JMadModelDefinitionImpl modelDefinition = (JMadModelDefinitionImpl) md;
        modelDefinition.setModelPackUri(modelPackage.uri().toASCIIString());
    }

    @Override
    public Mono<Void> clearCache() {
        return cache.clear();
    }

    private Mono<Resource> errorOffline(@SuppressWarnings("unused") ModelPackageVariant modelPackage) {
        return Mono.error(new IllegalStateException("service is in OFFLINE mode. No resource download is possible."));
    }

    private Flux<JMadModelDefinition> definitionsFromDirect(ModelPackageVariant modelPackage) {
        List<Flux<JMadModelDefinition>> directStreams = connectors.stream()
                .filter(c -> c instanceof DirectModelPackageConnector).map(c -> (DirectModelPackageConnector) c)
                .map(c -> c.modelDefinitionsFor(modelPackage)).collect(toList());

        return Flux.merge(directStreams);
    }

    private Flux<JMadModelDefinition> definitionsFromFile(ModelPackageVariant modelPackage) {
        return cache.fileFor(modelPackage, resourceCallback()).flatMapMany(this::modelDefinitionsFrom);
    }

    private Flux<JMadModelDefinition> modelDefinitionsFrom(File file) {
        return Flux.fromIterable(jMadService.getModelDefinitionImporter().importModelDefinitions(file));
    }

    private Mono<Resource> zipResourceFrom(ModelPackageVariant modelPackage) {
        List<Mono<Resource>> connectorStreams = connectors.stream().filter(c -> c instanceof ZipModelPackageConnector)
                .map(c -> (ZipModelPackageConnector) c).map(c -> c.zipResourceFor(modelPackage)).collect(toList());

        return Mono.first(connectorStreams);
    }

    private Flux<ModelPackageVariant> packagesFrom(JMadModelPackageRepository repo) {
        List<Flux<ModelPackageVariant>> connectorStreams = connectors.stream().filter(c -> c.canHandle(repo))
                .map(c -> c.availablePackages(repo).onErrorResume(t -> {
                    LOGGER.warn("Error while retrieving packages from repo {} from connector {}. Returning empty.",
                            repo, c, t);
                    return Flux.empty();
                })).collect(toList());

        return Flux.merge(connectorStreams);
    }

    @Override
    public Mode mode() {
        return this.mode.get();
    }

    @Override
    public void setMode(Mode mode) {
        this.mode.set(mode);
    }

}