function fork()

in conductor.js [92:126]


  function fork ({ p, node, index }, array, it) {
    const saved = p.params // save params
    p.s.state = index + node.return // return state
    p.params = { value: [] } // return value
    if (array.length === 0) return
    if (typeof p.s.redis !== 'object' || typeof p.s.redis.uri !== 'string' || (typeof p.s.redis.ca !== 'string' && typeof p.s.redis.ca !== 'undefined')) {
      p.params = { error: 'Parallel combinator requires a properly configured redis instance' }
      console.error(p.params.error)
      return
    }
    const stack = [{ marker: true }].concat(p.s.stack)
    const barrierId = require(/* webpackIgnore: true */ 'uuid').v4()
    console.log(`barrierId: ${barrierId}, spawning: ${array.length}`)
    if (!wsk) wsk = openwhisk(p.s.openwhisk)
    if (!db) db = createRedisClient(p)
    return db.lpushAsync(live(barrierId), 42) // push marker
      .then(() => db.expireAsync(live(barrierId), expiration))
      .then(() => Promise.all(array.map((item, position) => {
        const params = it(saved, item) // obtain combinator-specific params for branch invocation
        params.$composer.stack = stack
        params.$composer.redis = p.s.redis
        params.$composer.openwhisk = p.s.openwhisk
        params.$composer.join = { barrierId, position, count: array.length }
        return invoke({ name: process.env.__OW_ACTION_NAME, params }) // invoke branch
          .then(({ activationId }) => { console.log(`barrierId: ${barrierId}, spawned position: ${position} with activationId: ${activationId}`) })
      }))).then(() => collect(p, barrierId), error => {
        console.error(error.body || error)
        p.params = { error: `Parallel combinator failed to invoke a composition at AST node root${node.parent} (see log for details)` }
        return db.delAsync(live(barrierId), done(barrierId)) // delete keys
          .then(() => {
            inspect(p)
            return step(p)
          })
      })
  }