/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.impl.StreamConsumerBuilder;
import com.rabbitmq.stream.impl.StreamEnvironment;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SuperStreamConsumer
implements Consumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SuperStreamConsumer.class);
    private final String superStream;
    private final Map<String, Consumer> consumers = new ConcurrentHashMap<String, Consumer>();

    SuperStreamConsumer(StreamConsumerBuilder builder, String superStream, StreamEnvironment environment) {
        this.superStream = superStream;
        for (String partition : environment.locator().partitions(superStream)) {
            Consumer consumer = builder.duplicate().superStream(null).stream(partition).build();
            this.consumers.put(partition, consumer);
            LOGGER.debug("Created consumer on stream '{}' for super stream '{}'", (Object)partition, (Object)superStream);
        }
    }

    @Override
    public void store(long offset) {
        throw new UnsupportedOperationException("Consumer#store(long) does not work for super streams, use MessageHandler.Context#storeOffset() instead");
    }

    @Override
    public void close() {
        for (Map.Entry<String, Consumer> entry : this.consumers.entrySet()) {
            LOGGER.debug("Closing consumer for partition '{}' of super stream {}", (Object)entry.getKey(), (Object)this.superStream);
            try {
                entry.getValue().close();
            }
            catch (Exception e) {
                LOGGER.info("Error while closing consumer for partition {} of super stream {}: {}", new Object[]{entry.getKey(), this.superStream, e.getMessage()});
            }
        }
    }
}

