import { HttpService } from "@nestjs/axios"; import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from "@nestjs/common"; import { Observable, map, take } from "rxjs"; import { PageDto } from "../domain/dtos"; import { EsQueryDto } from "../domain/dtos/es-query.dto"; import { RequestDto } from "../domain/dtos/request.dto"; import { SearchQueryDto } from "../domain/dtos/search-q.dto"; import { EsTime } from "../domain/enums/es-time.enum"; import { Order } from "../domain/enums/page-order.enum"; import { PageMeta } from "../domain/interfaces"; import { EsPit } from "../domain/interfaces/es-pit.interface"; /** * Previous search data storage */ class PrevSearch { /** * Constructs an uninitialized object */ constructor() { this.pit = undefined; this.tiebreaker = undefined; this.prevPage = -1; } /** * PIT object of the previous search */ private pit: EsPit; set _pit(pit: EsPit) { this.pit = pit; } get _pit(): EsPit { return this.pit; } /** * Tiebreaker and sort parameters */ private tiebreaker: unknown[]; set _tiebreaker(tiebreaker: unknown[]) { this.tiebreaker = tiebreaker; } get _tiebreaker(): unknown[] { return this.tiebreaker; } /** * Number of the previous page */ private prevPage: number; set _prevPage(page: number) { this.prevPage = page; } get _prevPage(): number { return this.prevPage; } /** * Checks if there was the search before current one * @returns true/false, showing whether or not there was another search before */ public isSet(): boolean { if (this.pit && this.tiebreaker && this.prevPage !== -1) return true; return false; } } /** * Pagination-implementing interceptor */ @Injectable() export class PageInterceptor implements NestInterceptor { /** * Injects needed dependencies and instantiates the storage object * @param httpService * @param searchService */ constructor(private readonly httpService: HttpService) { this.prevSearch = new PrevSearch; } /** * Override of intercept() method, specified in NestInterceptor interface * @param context * @param next * @returns Page with content and metadata */ async intercept(context: ExecutionContext, next: CallHandler): Promise> { let request: RequestDto = context.switchToHttp().getRequest(); const query: SearchQueryDto = request.query; let reverse: boolean = false; request.es_query = new EsQueryDto(); request.es_query.query = { query_string: { query: query.query, default_field: 'content', } }; request.es_query.sort = [ { _score: { order: !query?.order ? Order.DESC : query.order } }, { _shard_doc: 'desc' } ]; if (this.prevSearch.isSet()) { request.es_query.pit = this.prevSearch._pit; request.es_query.search_after = this.prevSearch._tiebreaker; let limit = !query?.limit ? 10 : query.limit; request.es_query.size = limit * Math.abs(query.page - this.prevSearch._prevPage); if (query.page < this.prevSearch._prevPage) { request.es_query.sort = [{ _score: { order: 'asc' } }]; request.es_query.size += limit - 1; reverse = true; } else if (query.page == this.prevSearch._prevPage) { // Caching should be HERE request.es_query.sort = [{ _score: { order: 'asc' } }]; reverse = true; } } else { this.prevSearch._pit = request.es_query.pit = await this.getPIT(1); let limit = !query?.limit ? 10 : query.limit; request.es_query.size = limit * query.page; } return next.handle().pipe( map((res) => { // Setting the page meta-data let meta: PageMeta = { total: res.hits.total.value, pagenum: !query?.page ? 1 : +query.page, order: query?.order?.toUpperCase() === Order.ASC ? Order.ASC : Order.DESC, pagesize: !query?.limit ? 10 : query.limit, hasNext: undefined, hasPrev: undefined, }; meta.hasNext = meta.pagenum * meta.pagesize < meta.total ? true : false; meta.hasPrev = meta.pagenum != 1 ? true : false; // Saving the search info this.prevSearch._pit.id = res.pit_id; this.prevSearch._tiebreaker = res.hits.hits[res.hits.hits.length - 1]?.sort; this.prevSearch._prevPage = query.page; // Check if the performed search is a backwards search let data = res.hits.hits.slice(-meta.pagesize); if (reverse) { this.prevSearch._tiebreaker = data[0]?.sort; data.reverse(); reverse = false; } // Omitting the redundant info and leaving only the document data = data.map((el) => el._source); // Return the page return new PageDto(data, meta); }) ); } /** * Elastichsearch server port-number */ private readonly ES_PORT = process.env.ES_PORT; /** * Info about previously completed search */ private prevSearch: PrevSearch; /** * Acquires a PIT ID from Elasticsearch, needed for a request * @param alive, amount of time in minutes (defaults to 1). If time unit is not specified - defaults to minutes. * @returns PIT object containing PIT ID and keep_alive value */ public async getPIT(alive: number, unit: EsTime = EsTime.min): Promise { return new Promise((resolve, reject) => { try { this.httpService.post(`http://localhost:${this.ES_PORT}/papers/_pit?keep_alive=${alive+unit}`) .pipe(take(1), map(axiosRes => axiosRes.data)) .subscribe((res: EsPit) => { res.keep_alive = alive + unit; resolve(res); }); } catch (error) { reject(error); } }); } /** * Deletes the PIT specified by provided ID * @param pitID, ID of the PIT, that would be deleted * @returns true/false, depending on the result of deletion of the PIT */ async deletePIT(pitID: string): Promise { return new Promise((resolve, reject) => { try { this.httpService.delete(`http://localhost:${this.ES_PORT}/_pit`, { data: { id: pitID }, headers: { 'Content-Type': 'application/json' }, }) .pipe(take(1), map(axiosRes => axiosRes.data)) .subscribe((res) => { resolve(res.succeeded); }); } catch (error) { reject(error); } }) } }