/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.action.search;

import java.io.IOException;
import java.util.function.BiFunction;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.search.SearchActionListener;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.action.search.SearchTask;
import org.opensearch.action.search.SearchTransportService;
import org.opensearch.action.search.StreamSearchActionListener;
import org.opensearch.action.support.StreamSearchChannelListener;
import org.opensearch.common.settings.Setting;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchService;
import org.opensearch.search.dfs.DfsSearchResult;
import org.opensearch.search.fetch.FetchSearchResult;
import org.opensearch.search.fetch.QueryFetchSearchResult;
import org.opensearch.search.fetch.ShardFetchRequest;
import org.opensearch.search.fetch.ShardFetchSearchRequest;
import org.opensearch.search.internal.ShardSearchContextId;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.tasks.Task;
import org.opensearch.transport.StreamTransportResponseHandler;
import org.opensearch.transport.StreamTransportService;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.stream.StreamTransportResponse;

public class StreamSearchTransportService
extends SearchTransportService {
    private final Logger logger = LogManager.getLogger(StreamSearchTransportService.class);
    private final StreamTransportService transportService;
    public static final Setting<Boolean> STREAM_SEARCH_ENABLED = Setting.boolSetting("stream.search.enabled", false, Setting.Property.Dynamic, Setting.Property.NodeScope);

    public StreamSearchTransportService(StreamTransportService transportService, BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper) {
        super(transportService, responseWrapper);
        this.transportService = transportService;
    }

    public static void registerStreamRequestHandler(StreamTransportService transportService, SearchService searchService) {
        transportService.registerRequestHandler("indices:data/read/search[phase/query]", "same", false, true, AdmissionControlActionType.SEARCH, ShardSearchRequest::new, (request, channel, task) -> searchService.executeQueryPhase((ShardSearchRequest)request, false, (SearchShardTask)task, new StreamSearchChannelListener(channel, "indices:data/read/search[phase/query]", (ShardSearchRequest)request), "stream_search", true));
        transportService.registerRequestHandler("indices:data/read/search[phase/fetch/id]", "same", true, true, AdmissionControlActionType.SEARCH, ShardFetchSearchRequest::new, (request, channel, task) -> searchService.executeFetchPhase((ShardFetchRequest)request, (SearchShardTask)task, new StreamSearchChannelListener(channel, "indices:data/read/search[phase/fetch/id]", (ShardFetchSearchRequest)request), "stream_search"));
        transportService.registerRequestHandler("indices:data/read/search[can_match]", "same", ShardSearchRequest::new, (request, channel, task) -> searchService.canMatch((ShardSearchRequest)request, new StreamSearchChannelListener(channel, "indices:data/read/search[can_match]", (ShardSearchRequest)request)));
        transportService.registerRequestHandler("indices:data/read/search[free_context]", "same", SearchTransportService.SearchFreeContextRequest::new, (request, channel, task) -> {
            boolean freed = searchService.freeReaderContext(request.id());
            channel.sendResponseBatch(new SearchTransportService.SearchFreeContextResponse(freed));
            channel.completeStream();
        });
        transportService.registerRequestHandler("indices:data/read/search[phase/dfs]", "same", false, true, AdmissionControlActionType.SEARCH, ShardSearchRequest::new, (request, channel, task) -> searchService.executeDfsPhase((ShardSearchRequest)request, false, (SearchShardTask)task, new StreamSearchChannelListener(channel, "indices:data/read/search[phase/dfs]", (ShardSearchRequest)request), "stream_search"));
    }

    @Override
    public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest request, SearchTask task, final SearchActionListener<SearchPhaseResult> listener) {
        boolean fetchDocuments = request.numberOfShards() == 1;
        final Writeable.Reader reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;
        final StreamSearchActionListener streamListener = (StreamSearchActionListener)listener;
        StreamTransportResponseHandler<SearchPhaseResult> transportHandler = new StreamTransportResponseHandler<SearchPhaseResult>(){

            @Override
            public void handleStreamResponse(StreamTransportResponse<SearchPhaseResult> response) {
                try {
                    SearchPhaseResult currentResult;
                    SearchPhaseResult lastResult = null;
                    while ((currentResult = response.nextResponse()) != null) {
                        if (lastResult != null) {
                            streamListener.onStreamResponse(lastResult, false);
                        }
                        lastResult = currentResult;
                    }
                    if (lastResult != null) {
                        streamListener.onStreamResponse(lastResult, true);
                        StreamSearchTransportService.this.logger.debug("Processed final stream response");
                    } else {
                        StreamSearchTransportService.this.logger.error("Empty stream");
                    }
                    response.close();
                }
                catch (Exception e) {
                    response.cancel("Client error during search phase", e);
                    streamListener.onFailure(e);
                }
            }

            @Override
            public void handleException(TransportException e) {
                listener.onFailure((Exception)((Object)e));
            }

            @Override
            public String executor() {
                return "stream_search";
            }

            public SearchPhaseResult read(StreamInput in) throws IOException {
                return (SearchPhaseResult)((Object)reader.read(in));
            }
        };
        this.transportService.sendChildRequest(connection, "indices:data/read/search[phase/query]", request, task, transportHandler);
    }

    @Override
    public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, final SearchActionListener<FetchSearchResult> listener) {
        StreamTransportResponseHandler<FetchSearchResult> transportHandler = new StreamTransportResponseHandler<FetchSearchResult>(){

            @Override
            public void handleStreamResponse(StreamTransportResponse<FetchSearchResult> response) {
                try {
                    FetchSearchResult result = response.nextResponse();
                    listener.onResponse(result);
                    response.close();
                }
                catch (Exception e) {
                    response.cancel("Client error during fetch phase", e);
                    listener.onFailure(e);
                }
            }

            @Override
            public void handleException(TransportException exp) {
                listener.onFailure((Exception)((Object)exp));
            }

            @Override
            public String executor() {
                return "stream_search";
            }

            public FetchSearchResult read(StreamInput in) throws IOException {
                return new FetchSearchResult(in);
            }
        };
        this.transportService.sendChildRequest(connection, "indices:data/read/search[phase/fetch/id]", request, task, transportHandler);
    }

    @Override
    public void sendCanMatch(Transport.Connection connection, ShardSearchRequest request, SearchTask task, final ActionListener<SearchService.CanMatchResponse> listener) {
        StreamTransportResponseHandler<SearchService.CanMatchResponse> transportHandler = new StreamTransportResponseHandler<SearchService.CanMatchResponse>(){

            @Override
            public void handleStreamResponse(StreamTransportResponse<SearchService.CanMatchResponse> response) {
                try {
                    SearchService.CanMatchResponse result = response.nextResponse();
                    if (response.nextResponse() != null) {
                        throw new IllegalStateException("Only one response expected from SearchService.CanMatchResponse");
                    }
                    listener.onResponse((Object)result);
                    response.close();
                }
                catch (Exception e) {
                    response.cancel("Client error during can match", e);
                    listener.onFailure(e);
                }
            }

            @Override
            public void handleException(TransportException exp) {
                listener.onFailure((Exception)((Object)exp));
            }

            @Override
            public String executor() {
                return "same";
            }

            public SearchService.CanMatchResponse read(StreamInput in) throws IOException {
                return new SearchService.CanMatchResponse(in);
            }
        };
        this.transportService.sendChildRequest(connection, "indices:data/read/search[can_match]", (TransportRequest)request, (Task)task, TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STREAM).build(), transportHandler);
    }

    @Override
    public void sendFreeContext(Transport.Connection connection, ShardSearchContextId contextId, OriginalIndices originalIndices) {
        StreamTransportResponseHandler<SearchTransportService.SearchFreeContextResponse> transportHandler = new StreamTransportResponseHandler<SearchTransportService.SearchFreeContextResponse>(this){

            @Override
            public void handleStreamResponse(StreamTransportResponse<SearchTransportService.SearchFreeContextResponse> response) {
                try {
                    response.nextResponse();
                    response.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }

            @Override
            public void handleException(TransportException exp) {
            }

            @Override
            public String executor() {
                return "same";
            }

            public SearchTransportService.SearchFreeContextResponse read(StreamInput in) throws IOException {
                return new SearchTransportService.SearchFreeContextResponse(in);
            }
        };
        this.transportService.sendRequest(connection, "indices:data/read/search[free_context]", (TransportRequest)new SearchTransportService.SearchFreeContextRequest(originalIndices, contextId), TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STREAM).build(), transportHandler);
    }

    @Override
    public void sendExecuteDfs(Transport.Connection connection, ShardSearchRequest request, SearchTask task, final SearchActionListener<DfsSearchResult> listener) {
        StreamTransportResponseHandler<DfsSearchResult> transportHandler = new StreamTransportResponseHandler<DfsSearchResult>(this){

            @Override
            public void handleStreamResponse(StreamTransportResponse<DfsSearchResult> response) {
                try {
                    DfsSearchResult result = response.nextResponse();
                    listener.onResponse(result);
                    response.close();
                }
                catch (Exception e) {
                    response.cancel("Client error during search phase", e);
                    listener.onFailure(e);
                }
            }

            @Override
            public void handleException(TransportException e) {
                listener.onFailure((Exception)((Object)e));
            }

            @Override
            public String executor() {
                return "stream_search";
            }

            public DfsSearchResult read(StreamInput in) throws IOException {
                return new DfsSearchResult(in);
            }
        };
        this.transportService.sendChildRequest(connection, "indices:data/read/search[phase/dfs]", (TransportRequest)request, (Task)task, TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STREAM).build(), transportHandler);
    }
}

