How to use the pipeline function from highland

Find comprehensive JavaScript highland.pipeline code examples handpicked from public code repositorys.

highland.pipeline creates a new stream by piping an input Highland stream through a sequence of Highland functions.

129
130
131
132
133
134
135
136
137
        break;
      }
    }
  }; // detailChomper

  return H.pipeline(H.consume(detailChomper));
}

module.exports = detailing;
fork icon2
star icon17
watch icon8

71
72
73
74
75
76
77
78
79
        return emmy.cachedTracks[parseInt(name, 16)];
    } else {
      return undefined;
    }
  };
  return H.pipeline(H.doto(emmyPhone));
}

module.exports = emmyiser;
fork icon2
star icon17
watch icon8

+ 11 other calls in file

How does highland.pipeline work?

highland.pipeline is a method in the Highland.js library that allows you to compose a series of Highland streams, piping the output of one into the input of the next.

It takes multiple stream-producing functions as arguments, and returns a new function that can be used to create a pipeline.

When the pipeline function is called with an initial value, it creates a stream from that value and passes it through the pipeline, producing a final output stream.

172
173
174
175
176
177
178
179
180
181
  }
  return parsed;
};

const processingPipeline = function(filterErrors) {
  return _h.pipeline(
    _h.invoke('toString', ['utf8']),
    _h.through(parseMultiPart()),
    _h.map(parseHttpResponse),
    _h.map(function(doc) {
fork icon2
star icon8
watch icon3

+ 7 other calls in file

1
2
3
4
5
6
7
8
9
10
var h = require('highland');

var streamFromRecipe = require('./streamFromRecipe');

module.exports = function(recipe, connections, codecs, channels) {
  return h.pipeline(function(stream) {
    return stream
    .consume(function (err, file, push, next) {
      if (err) {
        push(err);
fork icon3
star icon2
watch icon2

Ai Example

1
2
3
4
5
6
7
8
const _ = require("highland");

const pipeline = _.pipeline(
  _.filter((x) => x % 2 === 0), // filter out odd numbers
  _.map((x) => x * 2) // double each remaining number
);

_([1, 2, 3, 4, 5]).through(pipeline); // apply the pipeline to the stream.toArray(result => console.log(result)); // output the result: [4, 8]

In this example, the _.pipeline function is used to create a pipeline consisting of two functions: _.filter and _.map. The _.filter function filters out any odd numbers from the input stream, while the _.map function doubles each of the remaining even numbers. The resulting transformed stream is output to the console using the toArray method.

9
10
11
12
13
14
15
16
17
18
19
function createStream (format, options) {
  let ogr
  options = options || {}
  options.path = `${options.tempPath || '.'}/${random.generate()}`
  mkdirp.sync(options.path)
  const output = _.pipeline(stream => {
    // init VRT stream and attach listeners otherwise the error event will be missed
    const through = _()


    stream
fork icon1
star icon14
watch icon7

3
4
5
6
7
8
9
10
11
12
13
const _ = require('highland')


function createStream (options) {
  const start = '{"type":"FeatureCollection","features":['
  const end = ']}'
  const readStream = _.pipeline(s => {
    const features = options && options.json ? _(s).compact().map(JSON.stringify) : _(s).compact()
    return _([start])
      .concat(features.intersperse(','))
      .append(end)
fork icon1
star icon14
watch icon0

34
35
36
37
38
39
40
41
42
43

/**
 * Return a pipeline for use in non-Highland situations:
 */

return h.pipeline(stream => {
  var client = new elasticsearch.Client(_.clone(opt));

  /**
   * The first part of the pipeline applies rate limiting (if necessary)...
fork icon1
star icon1
watch icon3

+ 5 other calls in file

21
22
23
24
25
26
27
28
29
30

    return Combinatorics.cartesianProduct(x, y).toArray();
}

module.exports = function() {
    return H.pipeline(
        H.collect(),
        H.consume(function (err, x, push, next){
            if (err) {
                // pass errors along the stream and consume next value
fork icon1
star icon0
watch icon7

33
34
35
36
37
38
39
40
41
42
    // Return a promise or wrap a value in a promise
    return Promise.resolve(value);
}

function pipeline(args) {
    return $.pipeline.apply(null, args);
}

/**
 * Use the dataprocess as a pipeline running a maximum of N number of activities
fork icon1
star icon0
watch icon17

+ 7 other calls in file

227
228
229
230
231
232
233
234
235
236
 * can be piped to return all the xrefs available for each entity reference.
 *
 * @return {Stream} entityReferenceToXrefsSetTransformationStream
 */
function createStream() {
  return highland.pipeline(function(sourceStream) {
    var options = instance.options || {};
    var specifiedEntityReference;
    // normalize the provided entity reference
    return highland(sourceStream)
fork icon1
star icon0
watch icon1

+ 3 other calls in file

378
379
380
381
382
383
384
385
386
387
if (config.recordType === 'export') return exportEvents(stream, config);
if (config.recordType === 'peopleExport') return exportProfiles(stream, config);

const flush = _.wrapCallback(callbackify(flushToMixpanel));

const mpPipeline = _.pipeline(
	// * transforms
	_.map((data) => {
		config.recordsProcessed++;
		if (config.fixData) {
fork icon0
star icon13
watch icon6

+ 57 other calls in file

94
95
96
97
98
99
100
        }
      }
      if (running) { next(); }
    }
  }
  return H.pipeline(H.consume(wavWatch));
}
fork icon2
star icon12
watch icon4

162
163
164
165
166
167
168
        push(null, packet);
      }
      next();
    }
  };
  return H.pipeline(H.consume(grainMuncher));
}
fork icon2
star icon12
watch icon4

61
62
63
64
65
66
67
        }
        next();
      }
    }
  }
  return H.pipeline(H.consume(packetSender));
}
fork icon2
star icon12
watch icon4

2
3
4
5
6
7
8
9
10
11
    ld = require('lodash');

function returnMultiple(dataStream) {
  const wrapper = _();
  const pipelineFunctions = Array.prototype.slice.call(arguments, 1);
  let resultPipeline = _.pipeline.apply(_, pipelineFunctions);
  if (!_.isStream(dataStream)) {
    // TODO (2016-02-21 Jonathan) Proper idiomatic highland error handling here
    throw Error('dataStream is not a stream');
  }
fork icon0
star icon2
watch icon2

+ 3 other calls in file

1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
let maxDelay = 20;

it("should work when stream is fast", function(done) {
  let source = Rx.Observable.from(input);

  let transformStream = hl.pipeline(function(s) {
    return s.consume(function(err, x, push, next) {
      if (err) {
        // pass errors along the stream and consume next value
        push(err);
fork icon0
star icon1
watch icon3

+ 4 other calls in file

185
186
187
188
189
190
191
192
193
194
    streams = hl.concat.apply(this, streams)
  } else {
    streams = hl(streams[0])
  }
  return streams.pipe(
    hl.pipeline(
      ndjson.parse(),
      hl.filter(function (data) { return data.timestamp >= start && (!end || data.timestamp < end) })
    ))
}
fork icon0
star icon1
watch icon2

10
11
12
13
14
15
16
17
18
19
20
//   }
// })


const batchWrite = ({
  TableName, batch, parallel = 1,
}) => __.pipeline(
  __.map(wrapPutRequest),
  __.batch(batch),
  __.map(defaultDocumentClient.wrapBatchWriteRequest(TableName)),
  __.map(batch => defaultDocumentClient.batchWrite(batch)),
fork icon0
star icon1
watch icon0

+ 2 other calls in file

129
130
131
132
133
134
135
136
137
138
if (!filtered && !isGeohash) return _();
// if the query is actually filtered then we use Winnow, otherwise it's a noop
var winnower = filtered ? Winnow.prepareQuery(options) : function (feature) {
  return feature;
};
var output = _.pipeline(function (stream) {
  return stream.pipe(FeatureParser.parse()).stopOnError(function (e) {
    return output.emit('error', e);
  }).map(JSON.parse).stopOnError(function (e) {
    return output.emit('error', e);
fork icon0
star icon0
watch icon2

+ 853 other calls in file

8
9
10
11
12
13
14
15
16
17
18
19
const RE_INCLUDE = /include\s*\(\s*["']([^"']+)["']\s*\)\s*;/g;


function Wrapper(begin, end) {
    var isStarted = false;


    return _.pipeline(
        _.consume(function (err, chunk, push, next) {
            if (err) {
                push(err);
                next();
fork icon0
star icon0
watch icon0

+ 7 other calls in file