diff --git a/.eslintrc b/.eslintrc index dc6dc67fd..8603ce9e1 100644 --- a/.eslintrc +++ b/.eslintrc @@ -5,6 +5,7 @@ "algolia/typescript" ], "rules": { + "no-continue": "off", "valid-jsdoc": "off", "import/extensions": [ "error", diff --git a/Dockerfile b/Dockerfile index b82fa8dc2..abcdf8265 100644 --- a/Dockerfile +++ b/Dockerfile @@ -34,6 +34,8 @@ RUN true \ # This image must have the minimum amount of layers FROM node:14.16.1-alpine as final +ENV NODE_ENV production + # Do not use root to run the app USER node diff --git a/package.json b/package.json index dd6f7b588..cfe2c6af9 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "npm-search", - "version": "1.0.1", + "version": "1.0.2", "private": true, "author": { "name": "Algolia, Inc.", @@ -35,6 +35,7 @@ "async": "3.2.0", "bunyan": "1.8.15", "bunyan-debug-stream": "2.0.0", + "chalk": "4.1.1", "dtrace-provider": "0.8.8", "escape-html": "1.0.3", "got": "11.8.2", diff --git a/scripts/build.sh b/scripts/build.sh index e98a3cce0..b4dbce554 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -7,6 +7,7 @@ echo "Releasing: $current" echo "" docker build \ + --platform linux/amd64 \ -t algolia/npm-search \ -t "algolia/npm-search:${current}" \ . diff --git a/src/__tests__/saveDocs.test.ts b/src/__tests__/saveDocs.test.ts index 77e0d59ed..34e74e8c1 100644 --- a/src/__tests__/saveDocs.test.ts +++ b/src/__tests__/saveDocs.test.ts @@ -213,7 +213,7 @@ it('should be similar batch vs one', async () => { const row = { id: '', key: 'preact', value: { rev: 'a' }, doc: preact }; await saveDocs({ docs: [row], index }); - await saveDoc({ row, index }); + await saveDoc({ row: preact, index }); expect(index.saveObjects).toHaveBeenCalledWith([clean]); expect(index.saveObject).toHaveBeenCalledWith(clean); diff --git a/src/bootstrap.ts b/src/bootstrap.ts index f5f5b5cc6..f6dd4e126 100644 --- a/src/bootstrap.ts +++ b/src/bootstrap.ts @@ -1,16 +1,23 @@ import type { SearchClient, SearchIndex } from 'algoliasearch'; -import ms from 'ms'; -import type { DocumentListParams } from 'nano'; +import type { QueueObject } from 'async'; +import { queue } from 'async'; +import chalk from 'chalk'; import type { StateManager } from './StateManager'; import * as algolia from './algolia'; import { config } from './config'; import * as npm from './npm'; -import { saveDocs } from './saveDocs'; +import type { PrefetchedPkg } from './npm/Prefetcher'; +import { Prefetcher } from './npm/Prefetcher'; +import { isFailure } from './npm/types'; +import { saveDoc } from './saveDocs'; import { datadog } from './utils/datadog'; import { log } from './utils/log'; +import * as sentry from './utils/sentry'; +import { wait } from './utils/wait'; -let loopStart: number = Date.now(); +let prefetcher: Prefetcher; +let consumer: QueueObject; /** * Bootstrap is the mode that goes from 0 to all the packages in NPM @@ -24,17 +31,20 @@ let loopStart: number = Date.now(); * Watch mode should/can be reliably left running for weeks/months as CouchDB is made for that. * BUT for the moment it's mandatory to relaunch it because it's the only way to update: typescript, downloads stats. */ -async function run( +export async function run( stateManager: StateManager, algoliaClient: SearchClient, mainIndex: SearchIndex, bootstrapIndex: SearchIndex ): Promise { + log.info('-----'); + log.info('⛷ Bootstrap: starting'); const state = await stateManager.check(); if (state.seq && state.seq > 0 && state.bootstrapDone === true) { await algolia.putDefaultSettings(mainIndex, config); log.info('⛷ Bootstrap: done'); + log.info('-----'); return; } @@ -54,70 +64,53 @@ async function run( } log.info('-----'); - log.info(`Total packages ${totalDocs}`); + log.info(chalk.yellowBright`Total packages: ${totalDocs}`); log.info('-----'); - let lastProcessedId = state.bootstrapLastId; - do { - loopStart = Date.now(); - - lastProcessedId = await loop(lastProcessedId, stateManager, bootstrapIndex); - } while (lastProcessedId !== null); - - log.info('-----'); - log.info('⛷ Bootstrap: done'); - await stateManager.save({ - bootstrapDone: true, - bootstrapLastDone: Date.now(), + prefetcher = new Prefetcher({ + nextKey: state.bootstrapLastId, }); + prefetcher.launch(); + + let done = 0; + consumer = createPkgConsumer(stateManager, bootstrapIndex); + consumer.unsaturated(async () => { + const next = await prefetcher.getNext(); + consumer.push(next); + done += 1; + }); + consumer.buffer = 0; - await moveToProduction(stateManager, algoliaClient); -} + let processing = true; + while (processing) { + logProgress(done); -/** - * Execute one loop for bootstrap, - * Fetch N packages from `lastId`, process and save them to Algolia. - * */ -async function loop( - lastId: string | null, - stateManager: StateManager, - bootstrapIndex: SearchIndex -): Promise { - const start = Date.now(); - log.info('loop()', '::', lastId); - - const options: DocumentListParams = { - limit: config.bootstrapConcurrency, - }; - if (lastId) { - options.startkey = lastId; - options.skip = 1; - } + await wait(config.prefetchWaitBetweenPage); - const res = await npm.findAll(options); + processing = !prefetcher.isFinished; + done = 0; - if (res.rows.length <= 0) { - // Nothing left to process - // We return null to stop the bootstraping - return null; + // Push nothing to trigger event + consumer.push(null as any); + processing = false; } - datadog.increment('packages', res.rows.length); - log.info(' - fetched', res.rows.length, 'packages'); - - const newLastId = res.rows[res.rows.length - 1].id; + consumer.pause(); - await saveDocs({ docs: res.rows, index: bootstrapIndex }); + log.info('-----'); + log.info('⛷ Bootstrap: done'); + log.info('-----'); await stateManager.save({ - bootstrapLastId: newLastId, + bootstrapDone: true, + bootstrapLastDone: Date.now(), }); - await logProgress(res.offset, res.rows.length); - - datadog.timing('loop', Date.now() - start); - return newLastId; + await moveToProduction(stateManager, algoliaClient); } +/** + * Move algolia index to prod. + */ async function moveToProduction( stateManager: StateManager, algoliaClient: SearchClient @@ -130,18 +123,64 @@ async function moveToProduction( await stateManager.save(currentState); } -async function logProgress(offset: number, nbDocs: number): Promise { +/** + * Log approximate progress. + */ +async function logProgress(nbDocs: number): Promise { const { nbDocs: totalDocs } = await npm.getInfo(); + const offset = prefetcher.offset; - const ratePerSecond = nbDocs / ((Date.now() - loopStart) / 1000); log.info( - `[progress] %d/%d docs (%d%), current rate: %d docs/s (%s remaining)`, + chalk.dim.italic + .white`[progress] %d/%d docs (%d%) (%s prefetched) (%s processing)`, offset + nbDocs, totalDocs, Math.floor((Math.max(offset + nbDocs, 1) / totalDocs) * 100), - Math.round(ratePerSecond), - ms(((totalDocs - offset - nbDocs) / ratePerSecond) * 1000) + prefetcher.idleCount, + consumer.running() ); } -export { run }; +/** + * Consume packages. + */ +function createPkgConsumer( + stateManager: StateManager, + index: SearchIndex +): QueueObject { + return queue(async (pkg) => { + if (!pkg) { + return; + } + + log.info(`Start:`, pkg.id); + const start = Date.now(); + + try { + datadog.increment('packages'); + + const res = await npm.getDoc(pkg.id); + + if (isFailure(res)) { + log.error('Got an error', res.error); + return; + } + + await saveDoc({ row: res, index }); + + const lastId = (await stateManager.get()).bootstrapLastId; + + // Because of concurrency we can have processed a package after in the list but sooner in the process. + if (!lastId || lastId < pkg.id) { + await stateManager.save({ + bootstrapLastId: pkg.id, + }); + } + } catch (err) { + sentry.report(err); + } finally { + log.info(`Done:`, pkg.id); + datadog.timing('loop', Date.now() - start); + } + }, config.bootstrapConcurrency); +} diff --git a/src/changelog.ts b/src/changelog.ts index 401c3b13f..95aff83ae 100644 --- a/src/changelog.ts +++ b/src/changelog.ts @@ -105,7 +105,6 @@ export async function getChangelog( for (const file of filelist) { const name = path.basename(file.name); if (!fileRegex.test(name)) { - // eslint-disable-next-line no-continue continue; } diff --git a/src/config.ts b/src/config.ts index 83cd661b2..8ef68c5ef 100644 --- a/src/config.ts +++ b/src/config.ts @@ -172,6 +172,8 @@ export const config = { expiresAt: ms('30 days'), popularExpiresAt: ms('7 days'), cacheTotalDownloads: ms('1 minute'), + prefetchWaitBetweenPage: 5000, + prefetchMaxIdle: 100, }; export type Config = typeof config; diff --git a/src/index.ts b/src/index.ts index 28b8dc7db..527d320aa 100644 --- a/src/index.ts +++ b/src/index.ts @@ -34,7 +34,10 @@ async function main(): Promise { createAPI(); // first we make sure the bootstrap index has the correct settings - log.info('💪 Setting up Algolia'); + log.info('💪 Setting up Algolia', [ + config.bootstrapIndexName, + config.indexName, + ]); const { client: algoliaClient, mainIndex, @@ -56,7 +59,6 @@ async function main(): Promise { // then we figure out which updates we missed since // the last time main index was updated - log.info('🚀 Launching Watch'); await watch.run(stateManager, mainIndex); } diff --git a/src/jsDelivr/index.ts b/src/jsDelivr/index.ts index 883a4d056..cbb9f44e9 100644 --- a/src/jsDelivr/index.ts +++ b/src/jsDelivr/index.ts @@ -95,7 +95,7 @@ export async function getFilesList( }); files = response.body.files; } catch (e) { - log.error(`Failed to fetch ${url}`, e); + log.error(`Failed to fetch ${url}`, e.message); } datadog.timing('jsdelivr.getFilesList', Date.now() - start); diff --git a/src/npm/Prefetcher.ts b/src/npm/Prefetcher.ts new file mode 100644 index 000000000..ab4e38e18 --- /dev/null +++ b/src/npm/Prefetcher.ts @@ -0,0 +1,80 @@ +import type { DocumentListParams } from 'nano'; + +import { config } from '../config'; +import { log } from '../utils/log'; +import { wait } from '../utils/wait'; + +import * as npm from './index'; + +export type PrefetchedPkg = { id: string }; + +export class Prefetcher { + #limit: number = config.bootstrapConcurrency; + #ready: PrefetchedPkg[] = []; + #nextKey: string | null = null; + #running: boolean = false; + #offset: number = 0; + #finished: boolean = false; + #maxIdle = config.prefetchMaxIdle; + + constructor(opts: { nextKey: string | null }) { + this.#nextKey = opts.nextKey; + } + + get offset(): number { + return this.#offset + this.#limit - this.#ready.length; + } + + get idleCount(): number { + return this.#ready.length; + } + + get isFinished(): boolean { + return this.#finished; + } + + async getNext(): Promise { + while (this.#ready.length <= 0) { + await wait(100); + } + + return this.#ready.shift()!; + } + + async launch(): Promise { + this.#running = true; + while (this.#running) { + if (this.#ready.length >= this.#maxIdle) { + await wait(config.prefetchWaitBetweenPage); + continue; + } + + await this.fetchOnePage(); + } + } + + private async fetchOnePage(): Promise { + const options: Partial = { + limit: this.#limit, + include_docs: false, + }; + + if (this.#nextKey) { + options.startkey = this.#nextKey; + options.skip = 1; + } + const { rows: packages, offset } = await npm.findAll(options); + + if (packages.length <= 0) { + this.#finished = true; + this.#running = false; + this.#offset = offset; + log.info('[pf] done'); + return; + } + + this.#ready.push(...packages); + this.#offset = offset; + this.#nextKey = packages[packages.length - 1].id; + } +} diff --git a/src/npm/index.ts b/src/npm/index.ts index bcaa8e924..e0b78d9c8 100644 --- a/src/npm/index.ts +++ b/src/npm/index.ts @@ -3,6 +3,7 @@ import type { DatabaseChangesParams, DatabaseChangesResponse, DocumentFetchResponse, + DocumentGetResponse, DocumentListParams, DocumentListResponse, DocumentScopeFollowUpdatesParams, @@ -83,6 +84,16 @@ async function getChanges( return results; } +async function getDoc(name: string): Promise { + const start = Date.now(); + + const doc = await db.get(name); + + datadog.timing('npm.getDoc', Date.now() - start); + + return doc; +} + async function getDocs({ keys, }: { @@ -148,25 +159,49 @@ async function getInfo(): Promise<{ nbDocs: number; seq: number }> { }; } -/** - * Validate if a package exists. - */ -async function validatePackageExists(pkgName: string): Promise { - const start = Date.now(); - - let exists: boolean; - try { - const response = await request(`${config.npmRootEndpoint}/${pkgName}`, { - method: 'HEAD', - }); - exists = response.statusCode === 200; - } catch (e) { - exists = false; - } - - datadog.timing('npm.validatePackageExists', Date.now() - start); - return exists; -} +// /** +// * Get a package version. +// * +// * Doc: https://github.com/npm/registry/blob/master/docs/responses/package-metadata.md. +// */ +// async function getPackageLight(pkgName: string): Promise { +// const start = Date.now(); + +// const { body } = await request( +// `${config.npmRootEndpoint}/${pkgName}`, +// { +// method: 'GET', +// headers: { +// Accept: 'application/vnd.npm.install-v1+json', +// }, +// responseType: 'json', +// } +// ); + +// datadog.timing('npm.getPackageLight', Date.now() - start); +// return body; +// } + +// /** +// * Get a package version. +// */ +// async function getPackageAtVersion( +// pkgName: string, +// version: string +// ): Promise { +// const start = Date.now(); + +// const { body } = await request( +// `${config.npmRootEndpoint}/${pkgName}/${version}`, +// { +// method: 'GET', +// responseType: 'json', +// } +// ); + +// datadog.timing('npm.getPackageLight', Date.now() - start); +// return body; +// } /** * Get list of packages that depends of them. @@ -367,7 +402,7 @@ export { getChanges, getInfo, getDocs, - validatePackageExists, + getDoc, getDependents, getDependent, getDownload, diff --git a/src/npm/types.ts b/src/npm/types.ts index 61c5e53f1..f8eb7f720 100644 --- a/src/npm/types.ts +++ b/src/npm/types.ts @@ -1,3 +1,5 @@ +import type { DocumentLookupFailure } from 'nano'; + export interface PackageDownload { downloads: number; package: string; @@ -65,3 +67,14 @@ export interface GetPackage { repository?: PackageRepo; schematics?: string; } + +export interface GetPackageLight { + name: string; + 'dist-tags': Record; + versions: Record>; + modified: string; +} + +export function isFailure(change: any): change is DocumentLookupFailure { + return change.error && !change.id; +} diff --git a/src/saveDocs.ts b/src/saveDocs.ts index b863d9c05..ec6ba6274 100644 --- a/src/saveDocs.ts +++ b/src/saveDocs.ts @@ -42,6 +42,7 @@ export async function saveDocs({ return Promise.resolve(0); } log.info(' => ', names); + log.info(' Adding metadata...'); let start2 = Date.now(); @@ -64,12 +65,12 @@ export async function saveDoc({ row, index, }: { - row: DocumentResponseRow; + row: GetPackage; index: SearchIndex; }): Promise { const start = Date.now(); - const formatted = formatPkg(row.doc!); + const formatted = formatPkg(row); datadog.timing('formatPkg', Date.now() - start); @@ -77,21 +78,14 @@ export async function saveDoc({ return; } - log.info(' => ', formatted.name); - log.info(' Adding metadata...'); - let start2 = Date.now(); const pkg = await addMetaData(formatted); datadog.timing('saveDocs.addMetaData.one', Date.now() - start2); - log.info(` Saving...`); - start2 = Date.now(); await index.saveObject(pkg); datadog.timing('saveDocs.saveObject.one', Date.now() - start2); - log.info(` Saved`); - datadog.timing('saveDocs.one', Date.now() - start); } diff --git a/src/typescript/index.ts b/src/typescript/index.ts index 6dcaecc83..3d5fb87f3 100644 --- a/src/typescript/index.ts +++ b/src/typescript/index.ts @@ -79,7 +79,6 @@ export function getTypeScriptSupport( for (const file of filelist) { if (!file.name.endsWith('.d.ts')) { - // eslint-disable-next-line no-continue continue; } diff --git a/src/utils/log.ts b/src/utils/log.ts index 83250f286..0a8f56b69 100644 --- a/src/utils/log.ts +++ b/src/utils/log.ts @@ -5,7 +5,8 @@ const stream = bunyanDebugStream({ showDate: process.env.NODE_ENV !== 'production', showProcess: false, showLoggerName: false, - showPid: process.env.NODE_ENV !== 'production', + showPid: false, + showLevel: process.env.NODE_ENV === 'production', }); export const log = bunyan.createLogger({ diff --git a/src/utils/wait.ts b/src/utils/wait.ts new file mode 100644 index 000000000..ba18f7529 --- /dev/null +++ b/src/utils/wait.ts @@ -0,0 +1,6 @@ +// Coming in nodejs 16 +export function wait(waitTime: number): Promise { + return new Promise((resolve) => { + setTimeout(resolve, waitTime); + }); +} diff --git a/src/watch.ts b/src/watch.ts index 285cccfe3..fc9b5249b 100644 --- a/src/watch.ts +++ b/src/watch.ts @@ -1,11 +1,13 @@ import type { SearchIndex } from 'algoliasearch'; import type { QueueObject } from 'async'; import { queue } from 'async'; -import type { DatabaseChangesResultItem, DocumentLookupFailure } from 'nano'; +import chalk from 'chalk'; +import type { DatabaseChangesResultItem } from 'nano'; import type { StateManager } from './StateManager'; import * as npm from './npm'; -import { saveDocs } from './saveDocs'; +import { isFailure } from './npm/types'; +import { saveDoc } from './saveDocs'; import { datadog } from './utils/datadog'; import { log } from './utils/log'; import * as sentry from './utils/sentry'; @@ -36,10 +38,14 @@ let changesConsumer: QueueObject; * It will never be up to date because he receive event at the same pace * as they arrive in listener A, even if it's not the same package. */ -async function run( +export async function run( stateManager: StateManager, mainIndex: SearchIndex ): Promise { + log.info('-----'); + log.info('🚀 Watch: starting'); + log.info('-----'); + await stateManager.save({ stage: 'watch', }); @@ -48,7 +54,9 @@ async function run( await watch(stateManager); - log.info('🚀 watch is done'); + log.info('-----'); + log.info('🚀 Watch: done'); + log.info('-----'); } /** @@ -93,7 +101,6 @@ async function loop( mainIndex: SearchIndex, change: DatabaseChangesResultItem ): Promise { - const start = Date.now(); datadog.increment('packages'); if (!change.id) { @@ -106,20 +113,18 @@ async function loop( // Delete package directly in index // Filter does not support async/await but there is no concurrency issue with this mainIndex.deleteObject(change.id); - log.info(`🚀 Deleted ${change.id}`); + log.info(`Deleted`, change.id); return; } - const doc = (await npm.getDocs({ keys: [change.id] })).rows[0]; + const res = await npm.getDoc(change.id); - if (isFailure(doc)) { - log.error('Got an error', doc.error); + if (isFailure(res)) { + log.error('Got an error', res.error); return; } - await saveDocs({ docs: [doc], index: mainIndex }); - - datadog.timing('watch.loop', Date.now() - start); + await saveDoc({ row: res, index: mainIndex }); } /** @@ -128,10 +133,12 @@ async function loop( */ function logProgress(seq: number): void { log.info( - `🚀 Synced %d/%d changes (%d%)`, + chalk.dim.italic + .white`[progress] Synced %d/%d changes (%d%) (%s remaining)`, seq, totalSequence, - Math.floor((Math.max(seq, 1) / totalSequence) * 100) + Math.floor((Math.max(seq, 1) / totalSequence) * 100), + totalSequence - seq ); } @@ -151,22 +158,22 @@ function createChangeConsumer( mainIndex: SearchIndex ): QueueObject { return queue(async (change) => { + const start = Date.now(); + const seq = change.seq; - log.info(`🚀 Received change [%s]`, seq); + log.info(`Start:`, change.id); + try { await loop(mainIndex, change); await stateManager.save({ seq, }); - logProgress(seq); } catch (err) { sentry.report(err); + } finally { + log.info(`Done:`, change.id); + logProgress(seq); + datadog.timing('watch.loop', Date.now() - start); } }, 1); } - -function isFailure(change: any): change is DocumentLookupFailure { - return change.error && !change.id; -} - -export { run }; diff --git a/yarn.lock b/yarn.lock index d27fb1a8b..6deaa73b4 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2244,6 +2244,14 @@ caseless@~0.12.0: version "0.12.0" resolved "https://registry.yarnpkg.com/caseless/-/caseless-0.12.0.tgz#1b681c21ff84033c826543090689420d187151dc" +chalk@4.1.1, chalk@^4.0.0, chalk@^4.1.0, chalk@^4.1.1: + version "4.1.1" + resolved "https://registry.yarnpkg.com/chalk/-/chalk-4.1.1.tgz#c80b3fab28bf6371e6863325eee67e618b77e6ad" + integrity sha512-diHzdDKxcU+bAsUboHLPEDQiw0qEe0qd7SYUn3HgcFlWgbDcfLGswOHYeGrHKzG9z6UYf01d9VFMfZxPM1xZSg== + dependencies: + ansi-styles "^4.1.0" + supports-color "^7.1.0" + chalk@^2.0.0: version "2.3.0" resolved "https://registry.yarnpkg.com/chalk/-/chalk-2.3.0.tgz#b5ea48efc9c1793dccc9b4767c93914d3f2d52ba" @@ -2261,14 +2269,6 @@ chalk@^2.3.2: escape-string-regexp "^1.0.5" supports-color "^5.3.0" -chalk@^4.0.0, chalk@^4.1.0, chalk@^4.1.1: - version "4.1.1" - resolved "https://registry.yarnpkg.com/chalk/-/chalk-4.1.1.tgz#c80b3fab28bf6371e6863325eee67e618b77e6ad" - integrity sha512-diHzdDKxcU+bAsUboHLPEDQiw0qEe0qd7SYUn3HgcFlWgbDcfLGswOHYeGrHKzG9z6UYf01d9VFMfZxPM1xZSg== - dependencies: - ansi-styles "^4.1.0" - supports-color "^7.1.0" - char-regex@^1.0.2: version "1.0.2" resolved "https://registry.yarnpkg.com/char-regex/-/char-regex-1.0.2.tgz#d744358226217f981ed58f479b1d6bcc29545dcf"