in conductor.js [315:336]
function collect (p, barrierId) {
if (!db) db = createRedisClient(p)
const timeout = Math.max(Math.floor((process.env.__OW_DEADLINE - new Date()) / 1000) - 5, 1)
console.log(`barrierId: ${barrierId}, waiting with timeout: ${timeout}s`)
return db.brpopAsync(done(barrierId), timeout) // pop marker
.then(marker => {
console.log(`barrierId: ${barrierId}, done waiting`)
if (marker !== null) {
return db.lrangeAsync(done(barrierId), 0, -1)
.then(result => result.map(JSON.parse).map(({ position, params }) => { p.params.value[position] = params }))
.then(() => db.delAsync(live(barrierId), done(barrierId))) // delete keys
.then(() => {
inspect(p)
return step(p)
})
} else { // timeout
p.s.collect = barrierId
console.log(`barrierId: ${barrierId}, handling timeout`)
return { method: 'action', action: '/whisk.system/utils/echo', params: p.params, state: { $composer: p.s } }
}
})
}