diff --git a/src/source/load_vector_tile.ts b/src/source/load_vector_tile.ts index 69ea24366f5..afcbfa2de45 100644 --- a/src/source/load_vector_tile.ts +++ b/src/source/load_vector_tile.ts @@ -1,10 +1,12 @@ import {VectorTile} from '@mapbox/vector-tile'; import Protobuf from 'pbf'; import {getArrayBuffer} from '../util/ajax'; +import assert from "assert"; import type {Callback} from '../types/callback'; import type {RequestedTileParameters} from './worker_source'; import type Scheduler from '../util/scheduler'; +import type {Cancelable} from '../types/cancelable'; export type LoadVectorTileResult = { rawData: ArrayBuffer; @@ -23,7 +25,29 @@ export type LoadVectorTileResult = { export type LoadVectorDataCallback = Callback; export type AbortVectorData = () => void; -export type LoadVectorData = (params: RequestedTileParameters, callback: LoadVectorDataCallback) => AbortVectorData | undefined; +export type LoadVectorData = (params: RequestedTileParameters, callback: LoadVectorDataCallback, deduped: DedupedRequest) => AbortVectorData | undefined; +export type DedupedRequestInput = {key : string, + metadata: any, + requestFunc: any, + callback: LoadVectorDataCallback, + fromQueue?: boolean +}; +export type VectorTileQueueEntry = DedupedRequestInput & { + cancelled: boolean, + cancel: () => void +}; + +let requestQueue: Map, numRequests: number; +export const resetRequestQueue = () => { + requestQueue = new Map(); + numRequests = 0; +}; +resetRequestQueue(); + +const filterQueue = (key: string) => { + requestQueue.delete(key); +}; + export class DedupedRequest { entries: { [key: string]: any; @@ -35,62 +59,153 @@ export class DedupedRequest { this.scheduler = scheduler; } - request(key: string, metadata: any, request: any, callback: LoadVectorDataCallback): () => void { - const entry = this.entries[key] = this.entries[key] || {callbacks: []}; + addToSchedulerOrCallDirectly({ + callback, + metadata, + err, + result, + }: { + callback: LoadVectorDataCallback; + metadata: any; + err: Error | null | undefined; + result: any; + }) { + if (this.scheduler) { + this.scheduler.add(() => { + callback(err, result); + }, metadata); + } else { + callback(err, result); + } + } + + getEntry = (key: string) => { + return ( + this.entries[key] || { + // use a set to avoid duplicate callbacks being added when calling from queue + callbacks: new Set(), + } + ); + }; + + request({key, metadata, requestFunc, callback, fromQueue}: DedupedRequestInput): Cancelable { + const entry = (this.entries[key] = this.getEntry(key)); + + const removeCallbackFromEntry = ({key, requestCallback}) => { + const entry = this.getEntry(key); + if (entry.result) { + return; + } + entry.callbacks.delete(requestCallback); + if (entry.callbacks.size) { + return; + } + if (entry.cancel) { + entry.cancel(); + } + filterQueue(key); + delete this.entries[key]; + }; + + let advanced = false; + const advanceRequestQueue = () => { + if (advanced) { + return; + } + advanced = true; + numRequests--; + assert(numRequests >= 0); + while (requestQueue.size && numRequests < 50) { + const request = requestQueue.values().next().value; + const {key, metadata, requestFunc, callback, cancelled} = request; + filterQueue(key); + if (!cancelled) { + request.cancel = this.request({ + key, + metadata, + requestFunc, + callback, + fromQueue: true + }).cancel; + } + } + }; if (entry.result) { const [err, result] = entry.result; - if (this.scheduler) { - this.scheduler.add(() => { - callback(err, result); - }, metadata); - } else { - callback(err, result); - } - return () => {}; + this.addToSchedulerOrCallDirectly({ + callback, + metadata, + err, + result, + }); + return {cancel: () => {}}; } - entry.callbacks.push(callback); + entry.callbacks.add(callback); + + const inQueue = requestQueue.has(key); + if ((!entry.cancel && !inQueue) || fromQueue) { + // Lack of attached cancel handler means this is the first request for this resource + if (numRequests >= 50) { + const queued = { + key, + metadata, + requestFunc, + callback, + cancelled: false, + cancel() {}, + }; + const cancelFunc = () => { + queued.cancelled = true; + removeCallbackFromEntry({ + key, + requestCallback: callback, + }); + }; + queued.cancel = cancelFunc; + requestQueue.set(key, queued); + return queued; + } + numRequests++; - if (!entry.cancel) { - entry.cancel = request((err, result) => { + const actualRequestCancel = requestFunc((err, result) => { entry.result = [err, result]; + for (const cb of entry.callbacks) { - if (this.scheduler) { - this.scheduler.add(() => { - cb(err, result); - }, metadata); - } else { - cb(err, result); - } + this.addToSchedulerOrCallDirectly({ + callback: cb, + metadata, + err, + result, + }); } - setTimeout(() => delete this.entries[key], 1000 * 3); + + filterQueue(key); + advanceRequestQueue(); + + setTimeout(() => { + delete this.entries[key]; + }, 1000 * 3); }); + entry.cancel = actualRequestCancel; } - return () => { - if (entry.result) return; - entry.callbacks = entry.callbacks.filter(cb => cb !== callback); - if (!entry.callbacks.length) { - entry.cancel(); - delete this.entries[key]; - } + return { + cancel() { + removeCallbackFromEntry({ + key, + requestCallback: callback, + }); + }, }; } } -/** - * @private - */ -export function loadVectorTile( - params: RequestedTileParameters, - callback: LoadVectorDataCallback, - skipParse?: boolean, -): () => void { - const key = JSON.stringify(params.request); +const makeArrayBufferHandler = ({requestParams, skipParse}) => { const makeRequest = (callback: LoadVectorDataCallback) => { - const request = getArrayBuffer(params.request, (err?: Error | null, data?: ArrayBuffer | null, cacheControl?: string | null, expires?: string | null) => { + const request = getArrayBuffer(requestParams, (err?: Error | null, data?: ArrayBuffer | null, cacheControl?: string | null, expires?: string | null) => { if (err) { callback(err); } else if (data) { @@ -108,11 +223,37 @@ export function loadVectorTile( }; }; + return makeRequest; +}; + +/** + * @private + */ +export function loadVectorTile( + params: RequestedTileParameters, + callback: LoadVectorDataCallback, + deduped: DedupedRequest, + skipParse?: boolean, + providedArrayBufferHandlerMaker?: any +): () => void { + const key = JSON.stringify(params.request); + + const arrayBufferCallbackMaker = providedArrayBufferHandlerMaker || makeArrayBufferHandler; + const makeRequest = arrayBufferCallbackMaker({requestParams: params.request, skipParse}); + if (params.data) { // if we already got the result earlier (on the main thread), return it directly - (this.deduped as DedupedRequest).entries[key] = {result: [null, params.data]}; + deduped.entries[key] = {result: [null, params.data]}; } const callbackMetadata = {type: 'parseTile', isSymbolTile: params.isSymbolTile, zoom: params.tileZoom}; - return (this.deduped as DedupedRequest).request(key, callbackMetadata, makeRequest, callback); + const dedupedAndQueuedRequest = deduped.request({ + key, + metadata: callbackMetadata, + requestFunc: makeRequest, + callback, + fromQueue: false + }); + + return dedupedAndQueuedRequest.cancel; } diff --git a/src/source/vector_tile_source.ts b/src/source/vector_tile_source.ts index 9f724e2adab..be33ef04a70 100644 --- a/src/source/vector_tile_source.ts +++ b/src/source/vector_tile_source.ts @@ -290,7 +290,7 @@ class VectorTileSource extends Evented implements ISource { // if workers are not ready to receive messages yet, use the idle time to preemptively // load tiles on the main thread and pass the result instead of requesting a worker to do so if (!this.dispatcher.ready) { - const cancel = loadVectorTile.call({deduped: this._deduped}, params, (err?: Error | null, data?: LoadVectorTileResult | null) => { + const cancel = loadVectorTile.call({}, params, (err?: Error | null, data?: LoadVectorTileResult | null) => { if (err || !data) { done.call(this, err); } else { @@ -302,7 +302,7 @@ class VectorTileSource extends Evented implements ISource { }; if (tile.actor) tile.actor.send('loadTile', params, done.bind(this), undefined, true); } - }, true); + }, this._deduped, true); tile.request = {cancel}; } else { diff --git a/src/source/vector_tile_worker_source.ts b/src/source/vector_tile_worker_source.ts index 29431ce3c93..a19cfefcb6b 100644 --- a/src/source/vector_tile_worker_source.ts +++ b/src/source/vector_tile_worker_source.ts @@ -69,16 +69,18 @@ class VectorTileWorkerSource extends Evented implements WorkerSource { */ loadTile(params: WorkerTileParameters, callback: WorkerTileCallback) { const uid = params.uid; - const requestParam = params && params.request; const perf = requestParam && requestParam.collectResourceTiming; const workerTile = this.loading[uid] = new WorkerTile(params); workerTile.abort = this.loadVectorData(params, (err, response) => { const aborted = !this.loading[uid]; - delete this.loading[uid]; + if (workerTile.status === 'done') { + return; + } + if (aborted || err || !response) { workerTile.status = 'done'; if (!aborted) this.loaded[uid] = workerTile; @@ -128,7 +130,7 @@ class VectorTileWorkerSource extends Evented implements WorkerSource { this.loaded = this.loaded || {}; this.loaded[uid] = workerTile; - }); + }, this.deduped); } /** diff --git a/test/unit/source/load_vector_tile.test.ts b/test/unit/source/load_vector_tile.test.ts new file mode 100644 index 00000000000..259005a827f --- /dev/null +++ b/test/unit/source/load_vector_tile.test.ts @@ -0,0 +1,104 @@ +import {test, expect, vi, beforeEach} from '../../util/vitest'; +import {loadVectorTile, DedupedRequest, resetRequestQueue} from '../../../src/source/load_vector_tile'; +import type {RequestedTileParameters} from '../../../src/source/worker_source'; + +const createScheduler = () => ({add: () => {}} as any); +const ARRAY_BUF_DELAY = 1500; +const MAX_REQUESTS = 50; +const arrayBufResolutionSpy = vi.fn(); + +const cancellableDelayedArrayBufRequestMaker = ({requestParams}) => { + return (callback) => { + let cancelled = false; + setTimeout(() => { + if (!cancelled) { + arrayBufResolutionSpy(requestParams); + } + callback(null, {}); + }, ARRAY_BUF_DELAY); + return () => { + cancelled = true; + }; + }; +}; + +const makeRequests = ({numberOfRequests, deduped}: {numberOfRequests: number, deduped: DedupedRequest}) => { + for (let i = 0; i < numberOfRequests; i++) { + loadVectorTile({request: {url: String(i)}} as RequestedTileParameters, () => {}, deduped, false, cancellableDelayedArrayBufRequestMaker); + } +}; + +beforeEach(() => { + arrayBufResolutionSpy.mockRestore(); + resetRequestQueue(); +}); + +test('loadVectorTile does not make array buffer request for duplicate tile requests', () => { + vi.useFakeTimers(); + const deduped = new DedupedRequest(createScheduler()); + const params = {request: {url: 'http://localhost:2900/fake.pbf'}} as RequestedTileParameters; + + loadVectorTile(params, () => {}, deduped, false, cancellableDelayedArrayBufRequestMaker); + loadVectorTile(params, () => {}, deduped, false, cancellableDelayedArrayBufRequestMaker); + loadVectorTile(params, () => {}, deduped, false, cancellableDelayedArrayBufRequestMaker); + + vi.advanceTimersByTime(ARRAY_BUF_DELAY); + expect(arrayBufResolutionSpy).toHaveBeenCalledTimes(1); + vi.useRealTimers(); +}); + +test('only processes concurrent requests up to the queue limit', () => { + vi.useFakeTimers(); + const deduped = new DedupedRequest(createScheduler()); + + makeRequests({numberOfRequests: 2 * MAX_REQUESTS, deduped}); + + vi.advanceTimersByTime(ARRAY_BUF_DELAY); + expect(arrayBufResolutionSpy).toHaveBeenCalledTimes(MAX_REQUESTS); + vi.useRealTimers(); +}); + +test('processes other items within the queue after earlier ones resolve', () => { + vi.useFakeTimers(); + const deduped = new DedupedRequest(createScheduler()); + + makeRequests({numberOfRequests: 3 * MAX_REQUESTS, deduped}); + + vi.advanceTimersByTime(ARRAY_BUF_DELAY); + expect(arrayBufResolutionSpy).toHaveBeenCalledTimes(MAX_REQUESTS); + expect(arrayBufResolutionSpy).toHaveBeenLastCalledWith({url: String(MAX_REQUESTS - 1)}); + + vi.advanceTimersByTime(ARRAY_BUF_DELAY); + expect(arrayBufResolutionSpy).toHaveBeenCalledTimes(MAX_REQUESTS * 2); + expect(arrayBufResolutionSpy).toHaveBeenLastCalledWith({url: String((MAX_REQUESTS * 2) - 1)}); + vi.useRealTimers(); +}); + +test('entries that are cancelled whilst in the queue do not send array buffer requests', () => { + vi.useFakeTimers(); + const deduped = new DedupedRequest(createScheduler()); + + makeRequests({numberOfRequests: MAX_REQUESTS, deduped}); + const cancel = loadVectorTile({request: {url: "abort"}} as RequestedTileParameters, () => {}, deduped, false, cancellableDelayedArrayBufRequestMaker); + cancel(); + + vi.advanceTimersByTime(2 * ARRAY_BUF_DELAY); + expect(arrayBufResolutionSpy).toHaveBeenCalledTimes(MAX_REQUESTS); + expect(arrayBufResolutionSpy).not.toHaveBeenCalledWith({url: "abort"}); + + vi.useRealTimers(); +}); + +test('entries cancelled outside the queue do not send array buffer requests', () => { + vi.useFakeTimers(); + const deduped = new DedupedRequest(createScheduler()); + + const cancel = loadVectorTile({request: {url: "abort"}} as RequestedTileParameters, () => {}, deduped, false, cancellableDelayedArrayBufRequestMaker); + cancel(); + + vi.advanceTimersByTime(ARRAY_BUF_DELAY); + expect(arrayBufResolutionSpy).not.toHaveBeenCalled(); + expect(arrayBufResolutionSpy).not.toHaveBeenCalledWith({url: "abort"}); + + vi.useRealTimers(); +});