import { bufferTime, filter, from, map, mergeAll, tap } from "rxjs";
import { createRxOneshotReq } from "rx-nostr";
import { markFromCache } from "applesauce-core/helpers";
import { logger } from "applesauce-core";
import { nanoid } from "nanoid";
import { Loader } from "./loader.js";
import { generatorSequence } from "../operators/generator-sequence.js";
import { distinctRelaysBatch } from "../operators/distinct-relays.js";
import { groupByRelay } from "../helpers/pointer.js";
import { consolidateEventPointers } from "../helpers/event-pointer.js";
function* cacheFirstSequence(rxNostr, pointers, opts, log) {
    let remaining = [...pointers];
    const id = nanoid(8);
    log = log.extend(id);
    const loaded = (packets) => {
        const ids = new Set(packets.map((p) => p.event.id));
        remaining = remaining.filter((p) => !ids.has(p.id));
    };
    if (opts?.cacheRequest) {
        let filter = { ids: remaining.map((e) => e.id) };
        const results = yield opts.cacheRequest([filter]).pipe(
        // mark the event as from the cache
        tap((event) => markFromCache(event)), 
        // convert to event packets
        map((e) => ({ event: e, from: "", subId: "single-event-loader", type: "EVENT" })));
        if (results.length > 0) {
            log(`Loaded ${results.length} events from cache`);
            loaded(results);
        }
    }
    // exit early if all pointers are loaded
    if (remaining.length === 0)
        return;
    let byRelay = groupByRelay(remaining, "default");
    // load remaining pointers from the relays
    let results = yield from(Array.from(byRelay.entries()).map(([relay, pointers]) => {
        let filter = { ids: pointers.map((e) => e.id) };
        let count = 0;
        const req = createRxOneshotReq({ filters: [filter], rxReqId: id });
        log(`Requesting from ${relay}`, filter.ids);
        let sub$;
        // don't specify relay if this is the "default" relay
        if (relay === "default")
            sub$ = rxNostr.use(req);
        else
            sub$ = rxNostr.use(req, { on: { relays: [relay] } });
        return sub$.pipe(tap({
            next: () => count++,
            complete: () => log(`Completed ${relay}, loaded ${count} events`),
        }));
    })).pipe(mergeAll());
    loaded(results);
    if (remaining.length > 0) {
        // failed to find remaining
        log("Failed to load", remaining.map((p) => p.id));
    }
}
export class SingleEventLoader extends Loader {
    log = logger.extend("SingleEventLoader");
    constructor(rxNostr, opts) {
        let options = opts || {};
        super((source) => source.pipe(
        // load first from cache
        bufferTime(opts?.bufferTime ?? 1000), 
        // ignore empty buffers
        filter((buffer) => buffer.length > 0), 
        // only request events from relays once
        distinctRelaysBatch((p) => p.id, options.refreshTimeout ?? 60_000), 
        // ensure there is only one of each event pointer
        map(consolidateEventPointers), 
        // run the loader sequence
        generatorSequence((pointers) => cacheFirstSequence(rxNostr, pointers, options, this.log), 
        // there will always be more events, never complete
        false)));
    }
}
