/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hc.core5.reactive;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.DataStreamChannel;
import org.apache.hc.core5.http.nio.ResponseChannel;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.reactive.ReactiveDataConsumer;
import org.apache.hc.core5.reactive.ReactiveDataProducer;
import org.apache.hc.core5.reactive.ReactiveRequestProcessor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public final class ReactiveServerExchangeHandler
implements AsyncServerExchangeHandler {
    private final ReactiveRequestProcessor requestProcessor;
    private final AtomicReference<ReactiveDataProducer> responseProducer = new AtomicReference();
    private final ReactiveDataConsumer requestConsumer;
    private volatile DataStreamChannel channel;

    public ReactiveServerExchangeHandler(ReactiveRequestProcessor requestProcessor) {
        this.requestProcessor = requestProcessor;
        this.requestConsumer = new ReactiveDataConsumer();
    }

    @Override
    public void handleRequest(HttpRequest request, EntityDetails entityDetails, ResponseChannel responseChannel, HttpContext context) throws HttpException, IOException {
        Callback<Publisher<ByteBuffer>> callback = result2 -> {
            ReactiveDataProducer producer = new ReactiveDataProducer((Publisher<ByteBuffer>)result2);
            if (this.channel != null) {
                producer.setChannel(this.channel);
            }
            this.responseProducer.set(producer);
            result2.subscribe((Subscriber)producer);
        };
        this.requestProcessor.processRequest(request, entityDetails, responseChannel, context, this.requestConsumer, callback);
    }

    @Override
    public void failed(Exception cause) {
        this.requestConsumer.failed(cause);
        ReactiveDataProducer p = this.responseProducer.get();
        if (p != null) {
            p.onError(cause);
        }
    }

    @Override
    public void updateCapacity(CapacityChannel capacityChannel) throws IOException {
        this.requestConsumer.updateCapacity(capacityChannel);
    }

    @Override
    public void consume(ByteBuffer src) throws IOException {
        this.requestConsumer.consume(src);
    }

    @Override
    public void streamEnd(List<? extends Header> trailers) throws HttpException, IOException {
        this.requestConsumer.streamEnd(trailers);
    }

    @Override
    public int available() {
        ReactiveDataProducer p = this.responseProducer.get();
        if (p == null) {
            return 0;
        }
        return p.available();
    }

    @Override
    public void produce(DataStreamChannel channel) throws IOException {
        this.channel = channel;
        ReactiveDataProducer p = this.responseProducer.get();
        if (p != null) {
            p.produce(channel);
        }
    }

    @Override
    public void releaseResources() {
        ReactiveDataProducer p = this.responseProducer.get();
        if (p != null) {
            p.releaseResources();
        }
        this.requestConsumer.releaseResources();
    }
}

