All files / bus/src/handler handler.ts

100% Statements 239/239
95.65% Branches 22/23
100% Functions 7/7
100% Lines 239/239

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 2401x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 17x 17x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 12x 11x 12x 12x 12x 9x 9x 9x 9x 12x 12x 3x 3x 3x 3x 3x 1x 2x 2x 2x 2x 2x 2x 2x 1x 11x 11x 11x 12x 12x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 24x 24x 24x 24x 24x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 27x 1x 1x 1x 1x 1x 1x 1x 1x  
/* eslint-disable @typescript-eslint/no-unnecessary-type-assertion */
 
import { Kernel, Singleton, Symbol, Thread, TypeOf } from '@sgrud/core';
import { Endpoint, wrap } from 'comlink';
import { NodeEndpoint } from 'comlink/dist/umd/node-adapter';
import { Observable, ObservableInput, ReplaySubject, Subscribable, Subscription, connectable, firstValueFrom, from, map, switchMap } from 'rxjs';
import { Bus } from '../bus/bus';
import { BusWorker } from '../worker';
import { name } from '../worker/package.json';
 
/**
 * The **BusHandler** implements and orchestrates the establishment, transferral
 * and deconstruction of any number of {@link Observable} streams. It operates
 * in conjunction with the {@link BusWorker} {@link Thread} which is run in the
 * background. To designate and organize different {@link Observable} streams,
 * the string literal helper type {@link Bus.Handle} is employed. As an example,
 * let the following hierarchical structure be given:
 *
 * ```text
 * io.github.sgrud
 * ├── io.github.sgrud.core
 * │   ├── io.github.sgrud.core.kernel
 * │   └── io.github.sgrud.core.transit
 * ├── io.github.sgrud.data
 * │   ├── io.github.sgrud.data.model.current
 * │   └── io.github.sgrud.data.model.global
 * └── io.github.sgrud.shell
 * │   └── io.github.sgrud.shell.route
 * └── io.github.sgrud.store
 *     ├── io.github.sgrud.store.global
 *     └── io.github.sgrud.store.local
 * ```
 *
 * Depending on the {@link Bus.Handle}, one may {@link observe} all established
 * streams beneath the root `io.github.sgrud` {@link Bus.Handle} or only one
 * specific stream, e.g., `io.github.sgrud.core.kernel`. The {@link Observable}
 * returned from the {@link observe} method will emit all {@link Bus.Value}s
 * originating from all streams beneath the root {@link Bus.Handle} in the first
 * case, or only {@link Bus.Value}s from one stream, in the second case.
 *
 * @decorator {@link Singleton}
 *
 * @see {@link BusWorker}
 */
@Singleton()
export class BusHandler {
 
  /**
   * Private static {@link ReplaySubject} used as the {@link BusHandler}
   * **loader**. This **loader** emits once after the {@link BusHandler} has
   * been successfully initialized.
   */
  private static loader: ReplaySubject<BusHandler>;
 
  /**
   * Static `Symbol.observable` method returning a {@link Subscribable}. The
   * returned {@link Subscribable} mirrors the private {@link loader} and is
   * used for initializations after the {@link BusHandler} has been
   * successfully initialized.
   *
   * @returns A {@link Subscribable} emitting this {@link BusHandler}.
   *
   * @example
   * Subscribe to the {@link BusHandler}:
   * ```ts
   * import { BusHandler } from '@sgrud/bus';
   * import { from } from 'rxjs';
   *
   * from(BusHandler).subscribe(console.log);
   * ```
   */
  public static [Symbol.observable](): Subscribable<BusHandler> {
    return this.loader.asObservable();
  }
 
  /**
   * Static initialization block.
   */
  static {
    this.loader = new ReplaySubject<BusHandler>(1);
  }
 
  /**
   * The **worker** {@link Thread} and main background workhorse. The underlying
   * {@link BusWorker} is run inside a {@link Worker} context in the background
   * and transparently handles {@link publish}ed and {@link observe}d streams
   * and the aggregation of their values depending on their {@link Bus.Handle},
   * i.e., hierarchy.
   *
   * @see {@link BusWorker}
   */
  public readonly worker: Thread<BusWorker>;
 
  /**
   * Public {@link BusHandler} **constructor**. As the {@link BusHandler} is a
   * {@link Singleton} class, this **constructor** is only invoked the first
   * time it is targeted by the `new` operator. Upon this first invocation, the
   * {@link worker} property is assigned an instance of the {@link BusWorker}
   * {@link Thread} while using the supplied `source`, if any.
   *
   * @param source - An optional {@link Kernel.Module} `source`.
   * @throws A {@link ReferenceError} when the environment is incompatible.
   */
  public constructor(source?: string) {
    from(this.worker = (async() => {
      let worker: Endpoint | NodeEndpoint;
 
      if (TypeOf.process(globalThis.process)) {
        const { Worker } = require('worker_threads');
        worker = new Worker(require.resolve(name));
 
        const nodeEndpoint = require('comlink/dist/umd/node-adapter');
        worker = nodeEndpoint(worker);
      } else {
        const kernel = new Kernel();
        source ||= `${kernel.nodeModules}/${name}`;
        const module = await firstValueFrom(kernel.resolve(name, source));
 
        if (!globalThis.sgrud && module.exports) {
          worker = new Worker(`${source}/${module.exports}`, {
            type: 'module'
          });
        } else if (globalThis.sgrud && module.unpkg) {
          worker = new Worker(`${source}/${module.unpkg}`, {
            type: 'classic'
          });
        } else {
          throw new ReferenceError(module.name);
        }
      }
 
      return wrap(worker as Endpoint);
    })()).pipe(map(() => this)).subscribe(BusHandler.loader);
  }
 
  /**
   * Invoking this method **observe**s the {@link Observable} stream represented
   * by the supplied `handle`. The method will return an {@link Observable}
   * originating from the {@link BusWorker} which emits all {@link Bus.Value}s
   * published under the supplied `handle`. When the **observe** method is
   * invoked with `'io.github.sgrud'`, all streams hierarchically beneath this
   * {@link Bus.Handle}, e.g., `'io.github.bus.status'`, will also be emitted by
   * the returned {@link Observable}.
   *
   * @param handle - The {@link Bus.Handle} to **observe**.
   * @typeParam T - The type of the **observe**d {@link Observable} stream.
   * @returns An {@link Observable} bus for `handle`.
   *
   * @example
   * **observe** the `'io.github.sgrud'` stream:
   * ```ts
   * import { BusHandler } from '@sgrud/bus';
   *
   * const busHandler = new BusHandler();
   * const handle = 'io.github.sgrud.example';
   *
   * busHandler.observe(handle).subscribe(console.log);
   * ```
   */
  public observe<T>(handle: Bus.Handle): Observable<Bus.Value<T>> {
    return from(this.worker).pipe(
      switchMap((worker) => worker.observe(handle)),
      switchMap((value) => value as Observable<Bus.Value<T>>)
    );
  }
 
  /**
   * Invoking this method **publish**es the supplied {@link Observable} `stream`
   * under the supplied `handle`. This method returns an {@link Observable} of
   * the **publish**ment of the supplied {@link Observable} `stream` under the
   * supplied `handle` with the {@link BusWorker}. When the **publish**ed
   * `source` {@link Observable} completes, the registration within the
   * {@link BusWorker} will automatically self-destruct.
   *
   * @param handle - The {@link Bus.Handle} to **publish** under.
   * @param stream - The {@link Observable} `stream` for `handle`.
   * @typeParam T - The type of the **publish**ed {@link Observable} stream.
   * @returns An {@link Observable} of the `stream` **publish**ment.
   *
   * @example
   * **publish** a stream under `'io.github.sgrud.example'`:
   * ```ts
   * import { BusHandler } from '@sgrud/bus';
   * import { of } from 'rxjs';
   *
   * const busHandler = new BusHandler();
   * const handle = 'io.github.sgrud.example';
   * const stream = of('published');
   *
   * busHandler.publish(handle, stream).subscribe();
   * ```
   */
  public publish<T>(
    handle: Bus.Handle,
    stream: ObservableInput<T>
  ): Observable<void> {
    (stream = connectable(stream, {
      connector: () => new ReplaySubject<T>(),
      resetOnDisconnect: false
    })).connect();
 
    return from(this.worker).pipe(
      switchMap((worker) => worker.publish(handle, stream))
    );
  }
 
  /**
   * Invoking this method **uplink**s the supplied `handle` to the supplied
   * `url` by establishing a {@link WebSocket} connection between the endpoint
   * behind the supplied `url` and the {@link BusWorker}. This method returns an
   * {@link Observable} of the **uplink** {@link Subscription} which can be used
   * to cancel the **uplink**. When the **uplink**ed {@link WebSocket} is closed
   * or throws an error, it is automatically cleaned up and unsubscribed from.
   *
   * @param handle - The {@link Bus.Handle} to **uplink**.
   * @param url - The endpoint `url` to establish an **uplink** to.
   * @returns An {@link Observable} of the **uplink** {@link Subscription}.
   *
   * @example
   * **uplink** the `'io.github.sgrud.uplink'` {@link Bus.Handle}:
   * ```ts
   * import { BusHandler } from '@sgrud/bus';
   *
   * const busHandler = new BusHandler();
   * const handle = 'io.github.sgrud.example';
   * const url = 'https://example.com/websocket';
   *
   * const uplink = busHandler.uplink(handle, url).subscribe();
   * ```
   */
  public uplink(handle: Bus.Handle, url: string): Observable<Subscription> {
    return from(this.worker).pipe(
      switchMap((worker) => worker.uplink(handle, url))
    );
  }
 
}
 
export type { BusWorker };