diff options
Diffstat (limited to 'node_modules/@mrmlnc/readdir-enhanced/lib/directory-reader.js')
-rw-r--r-- | node_modules/@mrmlnc/readdir-enhanced/lib/directory-reader.js | 380 |
1 files changed, 380 insertions, 0 deletions
diff --git a/node_modules/@mrmlnc/readdir-enhanced/lib/directory-reader.js b/node_modules/@mrmlnc/readdir-enhanced/lib/directory-reader.js new file mode 100644 index 0000000..569d793 --- /dev/null +++ b/node_modules/@mrmlnc/readdir-enhanced/lib/directory-reader.js @@ -0,0 +1,380 @@ +'use strict'; + +const Readable = require('stream').Readable; +const EventEmitter = require('events').EventEmitter; +const path = require('path'); +const normalizeOptions = require('./normalize-options'); +const stat = require('./stat'); +const call = require('./call'); + +/** + * Asynchronously reads the contents of a directory and streams the results + * via a {@link stream.Readable}. + */ +class DirectoryReader { + /** + * @param {string} dir - The absolute or relative directory path to read + * @param {object} [options] - User-specified options, if any (see {@link normalizeOptions}) + * @param {object} internalOptions - Internal options that aren't part of the public API + * @class + */ + constructor (dir, options, internalOptions) { + this.options = options = normalizeOptions(options, internalOptions); + + // Indicates whether we should keep reading + // This is set false if stream.Readable.push() returns false. + this.shouldRead = true; + + // The directories to read + // (initialized with the top-level directory) + this.queue = [{ + path: dir, + basePath: options.basePath, + posixBasePath: options.posixBasePath, + depth: 0 + }]; + + // The number of directories that are currently being processed + this.pending = 0; + + // The data that has been read, but not yet emitted + this.buffer = []; + + this.stream = new Readable({ objectMode: true }); + this.stream._read = () => { + // Start (or resume) reading + this.shouldRead = true; + + // If we have data in the buffer, then send the next chunk + if (this.buffer.length > 0) { + this.pushFromBuffer(); + } + + // If we have directories queued, then start processing the next one + if (this.queue.length > 0) { + if (this.options.facade.sync) { + while (this.queue.length > 0) { + this.readNextDirectory(); + } + } + else { + this.readNextDirectory(); + } + } + + this.checkForEOF(); + }; + } + + /** + * Reads the next directory in the queue + */ + readNextDirectory () { + let facade = this.options.facade; + let dir = this.queue.shift(); + this.pending++; + + // Read the directory listing + call.safe(facade.fs.readdir, dir.path, (err, items) => { + if (err) { + // fs.readdir threw an error + this.emit('error', err); + return this.finishedReadingDirectory(); + } + + try { + // Process each item in the directory (simultaneously, if async) + facade.forEach( + items, + this.processItem.bind(this, dir), + this.finishedReadingDirectory.bind(this, dir) + ); + } + catch (err2) { + // facade.forEach threw an error + // (probably because fs.readdir returned an invalid result) + this.emit('error', err2); + this.finishedReadingDirectory(); + } + }); + } + + /** + * This method is called after all items in a directory have been processed. + * + * NOTE: This does not necessarily mean that the reader is finished, since there may still + * be other directories queued or pending. + */ + finishedReadingDirectory () { + this.pending--; + + if (this.shouldRead) { + // If we have directories queued, then start processing the next one + if (this.queue.length > 0 && this.options.facade.async) { + this.readNextDirectory(); + } + + this.checkForEOF(); + } + } + + /** + * Determines whether the reader has finished processing all items in all directories. + * If so, then the "end" event is fired (via {@Readable#push}) + */ + checkForEOF () { + if (this.buffer.length === 0 && // The stuff we've already read + this.pending === 0 && // The stuff we're currently reading + this.queue.length === 0) { // The stuff we haven't read yet + // There's no more stuff! + this.stream.push(null); + } + } + + /** + * Processes a single item in a directory. + * + * If the item is a directory, and `option.deep` is enabled, then the item will be added + * to the directory queue. + * + * If the item meets the filter criteria, then it will be emitted to the reader's stream. + * + * @param {object} dir - A directory object from the queue + * @param {string} item - The name of the item (name only, no path) + * @param {function} done - A callback function that is called after the item has been processed + */ + processItem (dir, item, done) { + let stream = this.stream; + let options = this.options; + + let itemPath = dir.basePath + item; + let posixPath = dir.posixBasePath + item; + let fullPath = path.join(dir.path, item); + + // If `options.deep` is a number, and we've already recursed to the max depth, + // then there's no need to check fs.Stats to know if it's a directory. + // If `options.deep` is a function, then we'll need fs.Stats + let maxDepthReached = dir.depth >= options.recurseDepth; + + // Do we need to call `fs.stat`? + let needStats = + !maxDepthReached || // we need the fs.Stats to know if it's a directory + options.stats || // the user wants fs.Stats objects returned + options.recurseFn || // we need fs.Stats for the recurse function + options.filterFn || // we need fs.Stats for the filter function + EventEmitter.listenerCount(stream, 'file') || // we need the fs.Stats to know if it's a file + EventEmitter.listenerCount(stream, 'directory') || // we need the fs.Stats to know if it's a directory + EventEmitter.listenerCount(stream, 'symlink'); // we need the fs.Stats to know if it's a symlink + + // If we don't need stats, then exit early + if (!needStats) { + if (this.filter(itemPath, posixPath)) { + this.pushOrBuffer({ data: itemPath }); + } + return done(); + } + + // Get the fs.Stats object for this path + stat(options.facade.fs, fullPath, (err, stats) => { + if (err) { + // fs.stat threw an error + this.emit('error', err); + return done(); + } + + try { + // Add the item's path to the fs.Stats object + // The base of this path, and its separators are determined by the options + // (i.e. options.basePath and options.sep) + stats.path = itemPath; + + // Add depth of the path to the fs.Stats object for use this in the filter function + stats.depth = dir.depth; + + if (this.shouldRecurse(stats, posixPath, maxDepthReached)) { + // Add this subdirectory to the queue + this.queue.push({ + path: fullPath, + basePath: itemPath + options.sep, + posixBasePath: posixPath + '/', + depth: dir.depth + 1, + }); + } + + // Determine whether this item matches the filter criteria + if (this.filter(stats, posixPath)) { + this.pushOrBuffer({ + data: options.stats ? stats : itemPath, + file: stats.isFile(), + directory: stats.isDirectory(), + symlink: stats.isSymbolicLink(), + }); + } + + done(); + } + catch (err2) { + // An error occurred while processing the item + // (probably during a user-specified function, such as options.deep, options.filter, etc.) + this.emit('error', err2); + done(); + } + }); + } + + /** + * Pushes the given chunk of data to the stream, or adds it to the buffer, + * depending on the state of the stream. + * + * @param {object} chunk + */ + pushOrBuffer (chunk) { + // Add the chunk to the buffer + this.buffer.push(chunk); + + // If we're still reading, then immediately emit the next chunk in the buffer + // (which may or may not be the chunk that we just added) + if (this.shouldRead) { + this.pushFromBuffer(); + } + } + + /** + * Immediately pushes the next chunk in the buffer to the reader's stream. + * The "data" event will always be fired (via {@link Readable#push}). + * In addition, the "file", "directory", and/or "symlink" events may be fired, + * depending on the type of properties of the chunk. + */ + pushFromBuffer () { + let stream = this.stream; + let chunk = this.buffer.shift(); + + // Stream the data + try { + this.shouldRead = stream.push(chunk.data); + } + catch (err) { + this.emit('error', err); + } + + // Also emit specific events, based on the type of chunk + chunk.file && this.emit('file', chunk.data); + chunk.symlink && this.emit('symlink', chunk.data); + chunk.directory && this.emit('directory', chunk.data); + } + + /** + * Determines whether the given directory meets the user-specified recursion criteria. + * If the user didn't specify recursion criteria, then this function will default to true. + * + * @param {fs.Stats} stats - The directory's {@link fs.Stats} object + * @param {string} posixPath - The item's POSIX path (used for glob matching) + * @param {boolean} maxDepthReached - Whether we've already crawled the user-specified depth + * @returns {boolean} + */ + shouldRecurse (stats, posixPath, maxDepthReached) { + let options = this.options; + + if (maxDepthReached) { + // We've already crawled to the maximum depth. So no more recursion. + return false; + } + else if (!stats.isDirectory()) { + // It's not a directory. So don't try to crawl it. + return false; + } + else if (options.recurseGlob) { + // Glob patterns are always tested against the POSIX path, even on Windows + // https://github.com/isaacs/node-glob#windows + return options.recurseGlob.test(posixPath); + } + else if (options.recurseRegExp) { + // Regular expressions are tested against the normal path + // (based on the OS or options.sep) + return options.recurseRegExp.test(stats.path); + } + else if (options.recurseFn) { + try { + // Run the user-specified recursion criteria + return options.recurseFn.call(null, stats); + } + catch (err) { + // An error occurred in the user's code. + // In Sync and Async modes, this will return an error. + // In Streaming mode, we emit an "error" event, but continue processing + this.emit('error', err); + } + } + else { + // No recursion function was specified, and we're within the maximum depth. + // So crawl this directory. + return true; + } + } + + /** + * Determines whether the given item meets the user-specified filter criteria. + * If the user didn't specify a filter, then this function will always return true. + * + * @param {string|fs.Stats} value - Either the item's path, or the item's {@link fs.Stats} object + * @param {string} posixPath - The item's POSIX path (used for glob matching) + * @returns {boolean} + */ + filter (value, posixPath) { + let options = this.options; + + if (options.filterGlob) { + // Glob patterns are always tested against the POSIX path, even on Windows + // https://github.com/isaacs/node-glob#windows + return options.filterGlob.test(posixPath); + } + else if (options.filterRegExp) { + // Regular expressions are tested against the normal path + // (based on the OS or options.sep) + return options.filterRegExp.test(value.path || value); + } + else if (options.filterFn) { + try { + // Run the user-specified filter function + return options.filterFn.call(null, value); + } + catch (err) { + // An error occurred in the user's code. + // In Sync and Async modes, this will return an error. + // In Streaming mode, we emit an "error" event, but continue processing + this.emit('error', err); + } + } + else { + // No filter was specified, so match everything + return true; + } + } + + /** + * Emits an event. If one of the event listeners throws an error, + * then an "error" event is emitted. + * + * @param {string} eventName + * @param {*} data + */ + emit (eventName, data) { + let stream = this.stream; + + try { + stream.emit(eventName, data); + } + catch (err) { + if (eventName === 'error') { + // Don't recursively emit "error" events. + // If the first one fails, then just throw + throw err; + } + else { + stream.emit('error', err); + } + } + } +} + +module.exports = DirectoryReader; |