package net.shrine.messagequeuemiddleware;

import cats.effect.ContextShift;
import cats.effect.IO;
import cats.effect.IO$;
import com.typesafe.config.Config;
import fs2.Stream$;
import fs2.internal.FreeC;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import net.shrine.config.ConfigSource$;
import net.shrine.config.package$;
import net.shrine.crypto.SecureRandomSource$;
import net.shrine.http4s.catsio.ExecutionContexts$;
import net.shrine.log.Log$;
import net.shrine.messagequeuemiddleware.LocalMessageQueueMiddleware;
import net.shrine.messagequeueservice.DeliveryAttemptId;
import net.shrine.messagequeueservice.DeliveryAttemptId$;
import net.shrine.messagequeueservice.Message;
import net.shrine.messagequeueservice.MessageQueueService;
import net.shrine.messagequeueservice.Queue;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Tuple2;
import scala.UninitializedFieldError;
import scala.collection.IterableOnceOps;
import scala.collection.StringOps$;
import scala.collection.concurrent.Map;
import scala.collection.concurrent.TrieMap;
import scala.collection.concurrent.TrieMap$;
import scala.collection.immutable.Seq;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.Duration$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: LocalMessageQueueMiddleware.scala */
/* loaded from: input_file:WEB-INF/lib/shrine-message-service-SHRINE2020-1412-SNAPSHOT.jar:net/shrine/messagequeuemiddleware/LocalMessageQueueMiddleware$.class */
public final class LocalMessageQueueMiddleware$ implements MessageQueueService {
    public static final LocalMessageQueueMiddleware$ MODULE$ = new LocalMessageQueueMiddleware$();
    private static final String configPath = "shrine.hub.messagequeue.blockingq";
    private static final TrieMap<DeliveryAttemptId, Tuple2<DeliveryAttempt, Option<ScheduledFuture<?>>>> net$shrine$messagequeuemiddleware$LocalMessageQueueMiddleware$$messageDeliveryAttemptMap;
    private static final TrieMap<Object, ScheduledFuture<?>> net$shrine$messagequeuemiddleware$LocalMessageQueueMiddleware$$messageIdsToExpirationCleanupTasks;
    private static final Map<String, BlockingDeque<InternalMessage>> blockingQueuePool;
    private static final ContextShift<IO> logActionContextShift;
    private static volatile int bitmap$init$0;

    static {
        bitmap$init$0 |= 1;
        net$shrine$messagequeuemiddleware$LocalMessageQueueMiddleware$$messageDeliveryAttemptMap = TrieMap$.MODULE$.empty2();
        bitmap$init$0 |= 2;
        net$shrine$messagequeuemiddleware$LocalMessageQueueMiddleware$$messageIdsToExpirationCleanupTasks = TrieMap$.MODULE$.empty2();
        bitmap$init$0 |= 4;
        blockingQueuePool = TrieMap$.MODULE$.empty2();
        bitmap$init$0 |= 8;
        logActionContextShift = IO$.MODULE$.contextShift(ExecutionContexts$.MODULE$.longActionExecutionContext());
        bitmap$init$0 |= 16;
    }

    public String configPath() {
        if ((bitmap$init$0 & 1) == 0) {
            throw new UninitializedFieldError("Uninitialized field: /var/opt/bamboo/bamboo-home/local-working-dir/SHRINE-TRUNK523-JOB1/hub/message-service/src/main/scala/net/shrine/messagequeuemiddleware/LocalMessageQueueMiddleware.scala: 31");
        }
        String str = configPath;
        return configPath;
    }

    private Config config() {
        return ConfigSource$.MODULE$.config().getConfig(configPath());
    }

    public long net$shrine$messagequeuemiddleware$LocalMessageQueueMiddleware$$messageTimeToLiveInMillis() {
        return ((Duration) package$.MODULE$.ConfigExtensions(config()).get("messageTimeToLive", str -> {
            return Duration$.MODULE$.apply(str);
        })).toMillis();
    }

    private long messageRedeliveryDelay() {
        return ((Duration) package$.MODULE$.ConfigExtensions(config()).get("messageRedeliveryDelay", str -> {
            return Duration$.MODULE$.apply(str);
        })).toMillis();
    }

    private int messageMaxDeliveryAttempts() {
        return config().getInt("messageMaxDeliveryAttempts");
    }

    public TrieMap<DeliveryAttemptId, Tuple2<DeliveryAttempt, Option<ScheduledFuture<?>>>> net$shrine$messagequeuemiddleware$LocalMessageQueueMiddleware$$messageDeliveryAttemptMap() {
        if ((bitmap$init$0 & 2) == 0) {
            throw new UninitializedFieldError("Uninitialized field: /var/opt/bamboo/bamboo-home/local-working-dir/SHRINE-TRUNK523-JOB1/hub/message-service/src/main/scala/net/shrine/messagequeuemiddleware/LocalMessageQueueMiddleware.scala: 42");
        }
        TrieMap<DeliveryAttemptId, Tuple2<DeliveryAttempt, Option<ScheduledFuture<?>>>> trieMap = net$shrine$messagequeuemiddleware$LocalMessageQueueMiddleware$$messageDeliveryAttemptMap;
        return net$shrine$messagequeuemiddleware$LocalMessageQueueMiddleware$$messageDeliveryAttemptMap;
    }

    public TrieMap<Object, ScheduledFuture<?>> net$shrine$messagequeuemiddleware$LocalMessageQueueMiddleware$$messageIdsToExpirationCleanupTasks() {
        if ((bitmap$init$0 & 4) == 0) {
            throw new UninitializedFieldError("Uninitialized field: /var/opt/bamboo/bamboo-home/local-working-dir/SHRINE-TRUNK523-JOB1/hub/message-service/src/main/scala/net/shrine/messagequeuemiddleware/LocalMessageQueueMiddleware.scala: 45");
        }
        TrieMap<Object, ScheduledFuture<?>> trieMap = net$shrine$messagequeuemiddleware$LocalMessageQueueMiddleware$$messageIdsToExpirationCleanupTasks;
        return net$shrine$messagequeuemiddleware$LocalMessageQueueMiddleware$$messageIdsToExpirationCleanupTasks;
    }

    public List<Runnable> stop() {
        return LocalMessageQueueMiddleware$MessageScheduler$.MODULE$.shutDown();
    }

    public Map<String, BlockingDeque<InternalMessage>> blockingQueuePool() {
        if ((bitmap$init$0 & 8) == 0) {
            throw new UninitializedFieldError("Uninitialized field: /var/opt/bamboo/bamboo-home/local-working-dir/SHRINE-TRUNK523-JOB1/hub/message-service/src/main/scala/net/shrine/messagequeuemiddleware/LocalMessageQueueMiddleware.scala: 56");
        }
        Map<String, BlockingDeque<InternalMessage>> map = blockingQueuePool;
        return blockingQueuePool;
    }

    @Override // net.shrine.messagequeueservice.MessageQueueService
    public IO<Queue> createQueueIfAbsentIO(String str) {
        return IO$.MODULE$.apply(() -> {
            Queue queue = new Queue(str);
            MODULE$.blockingQueuePool().getOrElseUpdate(queue.name(), () -> {
                return new LinkedBlockingDeque();
            });
            return queue;
        });
    }

    @Override // net.shrine.messagequeueservice.MessageQueueService
    public IO<Queue> getQueueIO(String str) {
        return IO$.MODULE$.apply(() -> {
            Queue queue = new Queue(str);
            MODULE$.blockingQueuePool().getOrElse(queue.name(), () -> {
                throw new QueueDoesNotExistException(new Queue(str));
            });
            return queue;
        });
    }

    @Override // net.shrine.messagequeueservice.MessageQueueService
    public IO<BoxedUnit> deleteQueueIO(String str) {
        return IO$.MODULE$.apply(() -> {
            MODULE$.blockingQueuePool().remove(str).getOrElse(() -> {
                throw new QueueDoesNotExistException(new Queue(str));
            });
        });
    }

    @Override // net.shrine.messagequeueservice.MessageQueueService
    public IO<Seq<Queue>> queuesIO() {
        return IO$.MODULE$.apply(() -> {
            return ((IterableOnceOps) MODULE$.blockingQueuePool().keys().map(str -> {
                return new Queue(str);
            })).toSeq();
        });
    }

    @Override // net.shrine.messagequeueservice.MessageQueueService
    public IO<BoxedUnit> sendIO(String str, Queue queue) {
        return IO$.MODULE$.apply(() -> {
            BlockingDeque blockingDeque = (BlockingDeque) MODULE$.blockingQueuePool().getOrElse(queue.name(), () -> {
                throw new QueueDoesNotExistException(queue);
            });
            InternalMessage internalMessage = new InternalMessage(SecureRandomSource$.MODULE$.nextId(), str, System.currentTimeMillis(), queue, MODULE$.messageMaxDeliveryAttempts());
            LocalMessageQueueMiddleware$MessageScheduler$.MODULE$.scheduleExpiredMessageCleanup(queue, internalMessage);
            scala.concurrent.package$.MODULE$.blocking(() -> {
                blockingDeque.putLast(internalMessage);
            });
            Log$.MODULE$.debug(() -> {
                return new StringBuilder(49).append("After send to ").append(queue.name()).append(" - blockingQueue contains ").append(blockingDeque.size()).append(" messages").toString();
            });
        });
    }

    private ContextShift<IO> logActionContextShift() {
        if ((bitmap$init$0 & 16) == 0) {
            throw new UninitializedFieldError("Uninitialized field: /var/opt/bamboo/bamboo-home/local-working-dir/SHRINE-TRUNK523-JOB1/hub/message-service/src/main/scala/net/shrine/messagequeuemiddleware/LocalMessageQueueMiddleware.scala: 95");
        }
        ContextShift<IO> contextShift = logActionContextShift;
        return logActionContextShift;
    }

    @Override // net.shrine.messagequeueservice.MessageQueueService
    public IO<Option<Message>> receiveIO(Queue queue, Duration duration) {
        long currentTimeMillis = System.currentTimeMillis() + duration.toMillis();
        BlockingDeque blockingDeque = (BlockingDeque) blockingQueuePool().getOrElse(queue.name(), () -> {
            throw new QueueDoesNotExistException(queue);
        });
        return IO$.MODULE$.apply(() -> {
            Log$.MODULE$.debug(() -> {
                return new StringBuilder(55).append("Before receive from ").append(queue.name()).append(" - blockingQueue contains ").append(blockingDeque.size()).append(" messages").toString();
            });
        }).flatMap(boxedUnit -> {
            return MODULE$.logActionContextShift().evalOn(ExecutionContexts$.MODULE$.longActionExecutionContext(), IO$.MODULE$.apply(() -> {
                return (Option) scala.concurrent.package$.MODULE$.blocking(() -> {
                    return Option$.MODULE$.apply(scala.concurrent.package$.MODULE$.blocking(() -> {
                        return (InternalMessage) blockingDeque.pollFirst(currentTimeMillis - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                    }));
                });
            }));
        }).map(option -> {
            if (option.isEmpty()) {
                Log$.MODULE$.debug(() -> {
                    return new StringBuilder(39).append("No message available from queue ").append(queue.name()).append(" after ").append(duration).toString();
                });
            }
            return option.map(internalMessage -> {
                return MODULE$.toMessage(queue, internalMessage);
            });
        });
    }

    @Override // net.shrine.messagequeueservice.MessageQueueService
    public FreeC<IO, Message, BoxedUnit> receiveStream(Queue queue) {
        Log$.MODULE$.debug(() -> {
            return new StringBuilder(27).append("Starting stream from queue ").append(queue.name()).toString();
        });
        BlockingDeque blockingDeque = (BlockingDeque) blockingQueuePool().getOrElse(queue.name(), () -> {
            throw new QueueDoesNotExistException(queue);
        });
        return Stream$.MODULE$.repeatEval(logActionContextShift().evalOn(ExecutionContexts$.MODULE$.longActionExecutionContext(), IO$.MODULE$.apply(() -> {
            return (InternalMessage) scala.concurrent.package$.MODULE$.blocking(() -> {
                return (InternalMessage) blockingDeque.takeFirst();
            });
        })).map(internalMessage -> {
            return MODULE$.toMessage(queue, internalMessage);
        }));
    }

    public Message toMessage(Queue queue, InternalMessage internalMessage) {
        long create = DeliveryAttemptId$.MODULE$.create();
        DeliveryAttempt deliveryAttempt = new DeliveryAttempt(internalMessage, internalMessage.createdTime(), queue);
        LocalMessageQueueMiddleware$MessageScheduler$.MODULE$.scheduleMessageRedelivery(create, deliveryAttempt, messageRedeliveryDelay(), messageMaxDeliveryAttempts());
        LocalMessageQueueMiddleware.LocalMessage localMessage = new LocalMessageQueueMiddleware.LocalMessage(create, messageRedeliveryDelay(), internalMessage.remainingAttempts(), internalMessage.contents());
        Log$.MODULE$.debug(() -> {
            return new StringBuilder(25).append(StringOps$.MODULE$.take$extension(Predef$.MODULE$.augmentString(deliveryAttempt.toString()), 70)).append(" scheduled to expire for ").append(StringOps$.MODULE$.take$extension(Predef$.MODULE$.augmentString(localMessage.toString()), 70)).toString();
        });
        return localMessage;
    }

    public IO<BoxedUnit> completeMessage(long j) {
        return IO$.MODULE$.apply(() -> {
            MODULE$.net$shrine$messagequeuemiddleware$LocalMessageQueueMiddleware$$messageDeliveryAttemptMap().get(new DeliveryAttemptId(j)).fold(() -> {
                throw new MessageDoesNotExistAndCannotBeCompletedException(j);
            }, tuple2 -> {
                $anonfun$completeMessage$3(tuple2);
                return BoxedUnit.UNIT;
            });
        });
    }

    public static final /* synthetic */ boolean $anonfun$completeMessage$4(Tuple2 tuple2) {
        boolean z;
        if (tuple2 != null) {
            long underlying = ((DeliveryAttemptId) tuple2.mo5280_1()).underlying();
            Tuple2 tuple22 = (Tuple2) tuple2.mo5279_2();
            if ((new DeliveryAttemptId(underlying) instanceof DeliveryAttemptId) && (tuple22 instanceof Tuple2)) {
                z = true;
                return z;
            }
        }
        z = false;
        return z;
    }

    public static final /* synthetic */ void $anonfun$completeMessage$6(ScheduledFuture scheduledFuture) {
        LocalMessageQueueMiddleware$MessageScheduler$.MODULE$.cancelExpiredMessageCleanup(scheduledFuture);
    }

    public static final /* synthetic */ void $anonfun$completeMessage$3(Tuple2 tuple2) {
        InternalMessage message = ((DeliveryAttempt) tuple2.mo5280_1()).message();
        Queue queue = message.toQueue();
        MODULE$.net$shrine$messagequeuemiddleware$LocalMessageQueueMiddleware$$messageDeliveryAttemptMap().withFilter(tuple22 -> {
            return BoxesRunTime.boxToBoolean($anonfun$completeMessage$4(tuple22));
        }).foreach(tuple23 -> {
            Object obj;
            if (tuple23 != null) {
                long underlying = ((DeliveryAttemptId) tuple23.mo5280_1()).underlying();
                Tuple2 tuple23 = (Tuple2) tuple23.mo5279_2();
                if ((new DeliveryAttemptId(underlying) instanceof DeliveryAttemptId) && (tuple23 instanceof Tuple2)) {
                    if (((DeliveryAttempt) tuple23.mo5280_1()).message().id() == message.id()) {
                        MODULE$.net$shrine$messagequeuemiddleware$LocalMessageQueueMiddleware$$messageDeliveryAttemptMap().remove(new DeliveryAttemptId(underlying));
                        LocalMessageQueueMiddleware$MessageScheduler$.MODULE$.cancelScheduledMessageRedelivery((Option) tuple23.mo5279_2());
                        obj = MODULE$.net$shrine$messagequeuemiddleware$LocalMessageQueueMiddleware$$messageIdsToExpirationCleanupTasks().remove(BoxesRunTime.boxToLong(message.id())).map(scheduledFuture -> {
                            $anonfun$completeMessage$6(scheduledFuture);
                            return BoxedUnit.UNIT;
                        });
                    } else {
                        obj = BoxedUnit.UNIT;
                    }
                    return obj;
                }
            }
            throw new MatchError(tuple23);
        });
        ((BlockingDeque) MODULE$.blockingQueuePool().getOrElse(queue.name(), () -> {
            throw new QueueDoesNotExistException(queue);
        })).remove(message);
        Log$.MODULE$.debug(() -> {
            return new StringBuilder(23).append("Message from ").append(((DeliveryAttempt) tuple2.mo5280_1()).fromQueue()).append(" completed").toString();
        });
    }

    private LocalMessageQueueMiddleware$() {
    }
}
