package org.apache.james.backends.opensearch.search;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.fge.lambdas.Throwing;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.james.backends.opensearch.ReactorOpenSearchClient;
import org.opensearch.client.opensearch._types.Time;
import org.opensearch.client.opensearch.core.ClearScrollRequest;
import org.opensearch.client.opensearch.core.ScrollRequest;
import org.opensearch.client.opensearch.core.ScrollResponse;
import org.opensearch.client.opensearch.core.SearchRequest;
import org.opensearch.client.opensearch.core.search.Hit;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/backends/opensearch/search/ScrolledSearch.class */
public class ScrolledSearch {
    private static final Time TIMEOUT = (Time) new Time.Builder().time("1m").build();
    private final ReactorOpenSearchClient client;
    private final SearchRequest searchRequest;

    public ScrolledSearch(ReactorOpenSearchClient reactorOpenSearchClient, SearchRequest searchRequest) {
        this.client = reactorOpenSearchClient;
        this.searchRequest = searchRequest;
    }

    public Flux<Hit<ObjectNode>> searchHits() {
        return searchResponses().concatMap(scrollResponse -> {
            return Flux.fromIterable(scrollResponse.hits().hits());
        });
    }

    private Flux<ScrollResponse<ObjectNode>> searchResponses() {
        return Flux.push(fluxSink -> {
            AtomicReference atomicReference = new AtomicReference(Optional.empty());
            fluxSink.onRequest(j -> {
                next(fluxSink, atomicReference, j);
            });
            fluxSink.onDispose(() -> {
                close(atomicReference);
            });
        });
    }

    private void next(FluxSink<ScrollResponse<ObjectNode>> fluxSink, AtomicReference<Optional<String>> atomicReference, long j) {
        if (j <= 0) {
            return;
        }
        Consumer consumer = scrollResponse -> {
            atomicReference.set(Optional.of(scrollResponse.scrollId()));
            fluxSink.next(scrollResponse);
            if (scrollResponse.hits().hits().isEmpty()) {
                fluxSink.complete();
            } else {
                next(fluxSink, atomicReference, j - 1);
            }
        };
        Objects.requireNonNull(fluxSink);
        buildRequest(atomicReference.get()).subscribe(consumer, fluxSink::error);
    }

    private Mono<ScrollResponse<ObjectNode>> buildRequest(Optional<String> optional) {
        return (Mono) optional.map(Throwing.function(obj -> {
            return this.client.scroll(new ScrollRequest.Builder().scrollId((String) optional.get()).scroll(TIMEOUT).build());
        }).sneakyThrow()).orElseGet(() -> {
            try {
                return this.client.search(this.searchRequest).map(searchResponse -> {
                    return new ScrollResponse.Builder().scrollId(searchResponse.scrollId()).hits(searchResponse.hits()).took(searchResponse.took()).timedOut(searchResponse.timedOut()).shards(searchResponse.shards()).build();
                });
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    public void close(AtomicReference<Optional<String>> atomicReference) {
        atomicReference.get().map(str -> {
            return new ClearScrollRequest.Builder().scrollId(str, new String[0]).build();
        }).ifPresent(Throwing.consumer(clearScrollRequest -> {
            this.client.clearScroll(clearScrollRequest).subscribe();
        }).sneakyThrow());
    }
}
