package org.apache.james.webadmin.service;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import jakarta.inject.Inject;
import java.util.Optional;
import java.util.Set;
import org.apache.james.events.DispatchingFailureGroup;
import org.apache.james.events.Event;
import org.apache.james.events.EventBus;
import org.apache.james.events.EventDeadLetters;
import org.apache.james.events.Group;
import org.apache.james.task.Task;
import org.apache.james.util.streams.Limit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/webadmin/service/EventDeadLettersRedeliverService.class */
public class EventDeadLettersRedeliverService {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventDeadLettersRedeliverService.class);
    private final Set<EventBus> eventBuses;
    private final EventDeadLetters deadLetters;

    /* loaded from: input_file:org/apache/james/webadmin/service/EventDeadLettersRedeliverService$RunningOptions.class */
    public static class RunningOptions {
        public static RunningOptions DEFAULT = new RunningOptions(Limit.unlimited());
        private final Limit limit;

        public RunningOptions(Limit limit) {
            this.limit = limit;
        }

        @JsonCreator
        public RunningOptions(@JsonProperty("limit") Integer num) {
            this.limit = Limit.from(Optional.ofNullable(num));
        }

        @JsonProperty("limit")
        public Optional<Integer> limitValue() {
            return this.limit.getLimit();
        }

        public Limit limit() {
            return this.limit;
        }
    }

    @Inject
    @VisibleForTesting
    public EventDeadLettersRedeliverService(Set<EventBus> set, EventDeadLetters eventDeadLetters) {
        this.eventBuses = set;
        this.deadLetters = eventDeadLetters;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<Task.Result> redeliverEvents(EventRetriever eventRetriever, RunningOptions runningOptions) {
        return runningOptions.limit().applyOnFlux(eventRetriever.retrieveEvents(this.deadLetters)).flatMap(tuple3 -> {
            return redeliverGroupEvents((Group) tuple3.getT1(), (Event) tuple3.getT2(), (EventDeadLetters.InsertionId) tuple3.getT3());
        });
    }

    private Mono<Task.Result> redeliverGroupEvents(Group group, Event event, EventDeadLetters.InsertionId insertionId) {
        return (Mono) findEventBus(group).map(eventBus -> {
            return eventBus.reDeliver(group, event).then(this.deadLetters.remove(group, insertionId)).thenReturn(Task.Result.COMPLETED).onErrorResume(th -> {
                LOGGER.error("Error while performing redelivery of event: {} for group: {}", new Object[]{event.getEventId().toString(), group.asString(), th});
                return Mono.just(Task.Result.PARTIAL);
            });
        }).orElseGet(() -> {
            LOGGER.error("No eventBus associated. event: {} for group: {}", event.getEventId().toString(), group.asString());
            return Mono.just(Task.Result.PARTIAL);
        });
    }

    private Optional<EventBus> findEventBus(Group group) {
        if (!(group instanceof DispatchingFailureGroup)) {
            return this.eventBuses.stream().filter(eventBus -> {
                return eventBus.listRegisteredGroups().contains(group);
            }).findFirst();
        }
        DispatchingFailureGroup dispatchingFailureGroup = (DispatchingFailureGroup) group;
        return this.eventBuses.stream().filter(eventBus2 -> {
            return eventBus2.eventBusName().equals(dispatchingFailureGroup.getEventBusName());
        }).findFirst();
    }
}
