Web streams



Resources
Principle

Massive data processing relies on data streams: data cannot reside as a whole in memory. Instead, data chunks are incomplete data. In this context, Web streams is an Application Programming Interface -API-, which hides memory issues: computations are expressed on data while all data are not present, but arrive from a Web source.

ReadableStream
const decompression_stream = new DecompressionStream("gzip");
fetch("https://bulk.openweathermap.org/sample/city.list.json.gz").then(async (response) => {
    // 'response.body' is a 'ReadableStream' object:
    console.assert(response.body.constructor.name === 'ReadableStream');
    console.assert(response.body.locked === false /*&& response.body.state === 'readable'*/);

    const data_stream: ReadableStream<number> = response.body.pipeThrough(decompression_stream);
    for await (const chunk of Chunks(data_stream.getReader())) {
        console.assert(data_stream.locked); // 'getReader()' locks the stream...
        // Raw data stream:
        console.assert(chunk.constructor.name === 'Uint8Array');
        // console.info(`Chunk of size ${chunk.length}... with raw data: ${chunk}`);
        // Data stream as (incomplete) JSON. Result *CANNOT* be parsed as JSON:
        // console.info((new TextDecoder()).decode(chunk));
        /**
         * Caution, loss of data ('import * as Parser_with_data_loss from 'partial-json-parser';'):
        */
        const incomplete_data: Array<City> = Parser_with_data_loss((new TextDecoder()).decode(chunk));
        console.assert(incomplete_data.constructor.name === 'Array');
        // Stop, first chunk only for test:
        break;
    }
});

Creation of a ReadStream object (file system) to be served as ReadableStream object (Web)

const application = new Koa();
application.use((context: any /* Koa 'Context' type */) => {
    if (context.request.url === '/Prison_de_Nantes.json') {
        context.response.set('content-type', 'application/json');
        // A stream is set as response body:
        context.body = file_system.createReadStream('./Prison_de_Nantes.json');
    }
});
http.createServer(application.callback()).listen(1963);
TextDecoderStream
fetch("https://FranckBarbier.com/resources/Prison_de_Nantes/Prison_de_Nantes.json").then(async (response) => {
    // 'TextDecoderStream' transforms raw data into strings:
    const data_stream = response.body.pipeThrough(new TextDecoderStream());
    for await (const chunk of Chunks(data_stream.getReader())) {
        console.assert(chunk.constructor.name === 'String');
        console.info(`Chunk of size ${chunk.length}... with string data: ${chunk}`);
        // Stop, first chunk only for test:
        break;
    }
});
TransformStream
interface City {
    "id": number,
    "name": string,
    "state": string,
    "country": string,
    "coord"?: {
        "lon": number,
        "lat": number
    }
    // UTM coord. are computed by the transformation stream:
    "UTM"?: { "Easting": number, "Northing": number, "ZoneNumber": number, "ZoneLetter": string }
}

// https://exploringjs.com/nodejs-shell-scripting/ch_web-streams.html#implementing-custom-transformstreams
class Compute_UTM_from_lat_lon implements Transformer<Uint8Array, City> {
    private static readonly _Precision = 1;
    private static readonly _UTM = new UTM; // Default Ellipsoid is 'WGS 84'
    private readonly _text_decoder: TextDecoder = new TextDecoder();

    start(controller: TransformStreamDefaultController<City>): void | Promise<void> {
        console.info('Any initialization?');
    }

    transform(chunk: Uint8Array, controller: TransformStreamDefaultController<City>): void | Promise<void> {
        /**
         * Caution, loss of data ('import * as Parser_with_data_loss from 'partial-json-parser';'):
         */
        const cities: Array<City> = Parser_with_data_loss(this._text_decoder.decode(chunk));
        for (const city of cities)
            if ('coord' in city) {
                city.UTM = Compute_UTM_from_lat_lon._UTM.convertLatLngToUtm(city.coord.lat, city.coord.lon, Compute_UTM_from_lat_lon._Precision)
                controller.enqueue(city);
            }
        controller.terminate(); // Stop, first chunk only for test...
    }

    flush(controller: TransformStreamDefaultController<City>): void | Promise<void> {
        // Any finalization?
    }
}


export const Transformation = () => {
    const decompression_stream = new DecompressionStream("gzip");
    fetch("https://bulk.openweathermap.org/sample/city.list.json.gz").then(async (response) => {
        const data_stream: ReadableStream<any> = response.body.pipeThrough(decompression_stream);

        // const tranformation = new TransformStream({
        //     transform(chunk, controller) {
        //         controller.enqueue(new TextDecoder().decode(chunk));
        //         controller.terminate(); // Stop, first chunk only for test...
        //     },
        // });

        // 'Transformer' object is passed to 'TransformStream' at creation time:    
        const tranformation = new TransformStream(new Compute_UTM_from_lat_lon);
        const data_stream_: ReadableStream<City> = data_stream.pipeThrough(tranformation);
        for await (const city of Chunks(data_stream_.getReader()))
            console.info(city);
    });
}
Saving Web streams as files
fetch(OpenSLR_org_88.URL).then((response) => {
    if ('body' in response) { // 'response.body' is a 'ReadableStream' object:
        console.assert(response.body!.constructor.name === 'ReadableStream');
        console.assert(response.body!.locked === false /*&& response.body.state === 'readable'*/);
        const uncompressed_data_stream: ReadableStream<number> = response.body!.pipeThrough(new DecompressionStream(Compression.GZIP));
        // Conversion from Web 'ReadableStream' to file system 'ReadStream' (https://stackoverflow.com/questions/71509505/how-to-convert-web-stream-to-nodejs-native-stream):
        // TypeScript compilation error (https://stackoverflow.com/questions/63630114/argument-of-type-readablestreamany-is-not-assignable-to-parameter-of-type-r):
        // @ts-ignore
        const source = Readable.fromWeb(uncompressed_data_stream);
        const target = file_system.createWriteStream(OpenSLR_org_88.Archive_file_name);
        // Flat uncompressed (archive) file on disk (10,6 GB) is simply saved:
        source.pipe(target);
        source.on('end', () => { // Use of 'node-tar' library:
            tar.list({ // Let us the possibility of extracting directory hierarchy inside archive file:
                file: OpenSLR_org_88.Archive_file_name,
                onReadEntry: entry => console.info(entry.path)
            });
        });
    }
});
Decompression reusing import * as zlib from 'node:zlib';
await fetch(OpenSLR_org_88.URL).then(async (response) => {
    if ('body' in response) {
        // 'response.body' is a 'ReadableStream' object:
        console.assert(response.body!.constructor.name === 'ReadableStream');
        console.assert(response.body!.locked === false /*&& response.body.state === 'readable'*/);
        // Conversion from Web 'ReadableStream' to file system 'ReadStream' (https://stackoverflow.com/questions/71509505/how-to-convert-web-stream-to-nodejs-native-stream):
        // @ts-ignore
        const source = Readable.fromWeb(response.body); // Compressed data stream...
        // source.pipe(file_system.createWriteStream('OpenSLR_org_88.tgz')); // Compressed version on local disk...
        const target = file_system.createWriteStream(OpenSLR_org_88.Archive_file_name);
        /** 'zlib'-based decompression (optional because 'tar' does the job later on if compressed) */
        await promisify(pipeline)(source, zlib.createUnzip(), target).catch((error) => {
            throw new Error("'zlib' failed... " + error);
        }); // Archive file created...
        // Use of 'node-tar' library:
        await tar.extract({file: OpenSLR_org_88.Archive_file_name}).then(_ => {
            // Archive file expanded...
        }).catch((error) => {
            throw new Error("'tar.extract' failed... " + error);
        });
    } else
        throw new Error("''body' in response', untrue... ");
    }).catch((error) => {
        throw new Error("'fetch' failed... " + error);
});