in conductor.js [92:126]
function fork ({ p, node, index }, array, it) {
const saved = p.params // save params
p.s.state = index + node.return // return state
p.params = { value: [] } // return value
if (array.length === 0) return
if (typeof p.s.redis !== 'object' || typeof p.s.redis.uri !== 'string' || (typeof p.s.redis.ca !== 'string' && typeof p.s.redis.ca !== 'undefined')) {
p.params = { error: 'Parallel combinator requires a properly configured redis instance' }
console.error(p.params.error)
return
}
const stack = [{ marker: true }].concat(p.s.stack)
const barrierId = require(/* webpackIgnore: true */ 'uuid').v4()
console.log(`barrierId: ${barrierId}, spawning: ${array.length}`)
if (!wsk) wsk = openwhisk(p.s.openwhisk)
if (!db) db = createRedisClient(p)
return db.lpushAsync(live(barrierId), 42) // push marker
.then(() => db.expireAsync(live(barrierId), expiration))
.then(() => Promise.all(array.map((item, position) => {
const params = it(saved, item) // obtain combinator-specific params for branch invocation
params.$composer.stack = stack
params.$composer.redis = p.s.redis
params.$composer.openwhisk = p.s.openwhisk
params.$composer.join = { barrierId, position, count: array.length }
return invoke({ name: process.env.__OW_ACTION_NAME, params }) // invoke branch
.then(({ activationId }) => { console.log(`barrierId: ${barrierId}, spawned position: ${position} with activationId: ${activationId}`) })
}))).then(() => collect(p, barrierId), error => {
console.error(error.body || error)
p.params = { error: `Parallel combinator failed to invoke a composition at AST node root${node.parent} (see log for details)` }
return db.delAsync(live(barrierId), done(barrierId)) // delete keys
.then(() => {
inspect(p)
return step(p)
})
})
}