188 lines
5.1 KiB
JavaScript
188 lines
5.1 KiB
JavaScript
|
'use strict'
|
||
|
|
||
|
const util = require('util')
|
||
|
|
||
|
const contentPath = require('./path')
|
||
|
const fixOwner = require('../util/fix-owner')
|
||
|
const fs = require('graceful-fs')
|
||
|
const moveFile = require('../util/move-file')
|
||
|
const Minipass = require('minipass')
|
||
|
const Pipeline = require('minipass-pipeline')
|
||
|
const Flush = require('minipass-flush')
|
||
|
const path = require('path')
|
||
|
const rimraf = util.promisify(require('rimraf'))
|
||
|
const ssri = require('ssri')
|
||
|
const uniqueFilename = require('unique-filename')
|
||
|
const { disposer } = require('./../util/disposer')
|
||
|
const fsm = require('fs-minipass')
|
||
|
|
||
|
const writeFile = util.promisify(fs.writeFile)
|
||
|
|
||
|
module.exports = write
|
||
|
|
||
|
function write (cache, data, opts) {
|
||
|
opts = opts || {}
|
||
|
if (opts.algorithms && opts.algorithms.length > 1) {
|
||
|
throw new Error('opts.algorithms only supports a single algorithm for now')
|
||
|
}
|
||
|
if (typeof opts.size === 'number' && data.length !== opts.size) {
|
||
|
return Promise.reject(sizeError(opts.size, data.length))
|
||
|
}
|
||
|
const sri = ssri.fromData(data, {
|
||
|
algorithms: opts.algorithms
|
||
|
})
|
||
|
if (opts.integrity && !ssri.checkData(data, opts.integrity, opts)) {
|
||
|
return Promise.reject(checksumError(opts.integrity, sri))
|
||
|
}
|
||
|
|
||
|
return disposer(makeTmp(cache, opts), makeTmpDisposer,
|
||
|
(tmp) => {
|
||
|
return writeFile(tmp.target, data, { flag: 'wx' })
|
||
|
.then(() => moveToDestination(tmp, cache, sri, opts))
|
||
|
})
|
||
|
.then(() => ({ integrity: sri, size: data.length }))
|
||
|
}
|
||
|
|
||
|
module.exports.stream = writeStream
|
||
|
|
||
|
// writes proxied to the 'inputStream' that is passed to the Promise
|
||
|
// 'end' is deferred until content is handled.
|
||
|
class CacacheWriteStream extends Flush {
|
||
|
constructor (cache, opts) {
|
||
|
super()
|
||
|
this.opts = opts
|
||
|
this.cache = cache
|
||
|
this.inputStream = new Minipass()
|
||
|
this.inputStream.on('error', er => this.emit('error', er))
|
||
|
this.inputStream.on('drain', () => this.emit('drain'))
|
||
|
this.handleContentP = null
|
||
|
}
|
||
|
|
||
|
write (chunk, encoding, cb) {
|
||
|
if (!this.handleContentP) {
|
||
|
this.handleContentP = handleContent(
|
||
|
this.inputStream,
|
||
|
this.cache,
|
||
|
this.opts
|
||
|
)
|
||
|
}
|
||
|
return this.inputStream.write(chunk, encoding, cb)
|
||
|
}
|
||
|
|
||
|
flush (cb) {
|
||
|
this.inputStream.end(() => {
|
||
|
if (!this.handleContentP) {
|
||
|
const e = new Error('Cache input stream was empty')
|
||
|
e.code = 'ENODATA'
|
||
|
// empty streams are probably emitting end right away.
|
||
|
// defer this one tick by rejecting a promise on it.
|
||
|
return Promise.reject(e).catch(cb)
|
||
|
}
|
||
|
this.handleContentP.then(
|
||
|
(res) => {
|
||
|
res.integrity && this.emit('integrity', res.integrity)
|
||
|
res.size !== null && this.emit('size', res.size)
|
||
|
cb()
|
||
|
},
|
||
|
(er) => cb(er)
|
||
|
)
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function writeStream (cache, opts) {
|
||
|
opts = opts || {}
|
||
|
return new CacacheWriteStream(cache, opts)
|
||
|
}
|
||
|
|
||
|
function handleContent (inputStream, cache, opts) {
|
||
|
return disposer(makeTmp(cache, opts), makeTmpDisposer, (tmp) => {
|
||
|
return pipeToTmp(inputStream, cache, tmp.target, opts)
|
||
|
.then((res) => {
|
||
|
return moveToDestination(
|
||
|
tmp,
|
||
|
cache,
|
||
|
res.integrity,
|
||
|
opts
|
||
|
).then(() => res)
|
||
|
})
|
||
|
})
|
||
|
}
|
||
|
|
||
|
function pipeToTmp (inputStream, cache, tmpTarget, opts) {
|
||
|
let integrity
|
||
|
let size
|
||
|
const hashStream = ssri.integrityStream({
|
||
|
integrity: opts.integrity,
|
||
|
algorithms: opts.algorithms,
|
||
|
size: opts.size
|
||
|
})
|
||
|
hashStream.on('integrity', i => { integrity = i })
|
||
|
hashStream.on('size', s => { size = s })
|
||
|
|
||
|
const outStream = new fsm.WriteStream(tmpTarget, {
|
||
|
flags: 'wx'
|
||
|
})
|
||
|
|
||
|
// NB: this can throw if the hashStream has a problem with
|
||
|
// it, and the data is fully written. but pipeToTmp is only
|
||
|
// called in promisory contexts where that is handled.
|
||
|
const pipeline = new Pipeline(
|
||
|
inputStream,
|
||
|
hashStream,
|
||
|
outStream
|
||
|
)
|
||
|
|
||
|
return pipeline.promise()
|
||
|
.then(() => ({ integrity, size }))
|
||
|
.catch(er => rimraf(tmpTarget).then(() => { throw er }))
|
||
|
}
|
||
|
|
||
|
function makeTmp (cache, opts) {
|
||
|
const tmpTarget = uniqueFilename(path.join(cache, 'tmp'), opts.tmpPrefix)
|
||
|
return fixOwner.mkdirfix(cache, path.dirname(tmpTarget)).then(() => ({
|
||
|
target: tmpTarget,
|
||
|
moved: false
|
||
|
}))
|
||
|
}
|
||
|
|
||
|
function makeTmpDisposer (tmp) {
|
||
|
if (tmp.moved) {
|
||
|
return Promise.resolve()
|
||
|
}
|
||
|
return rimraf(tmp.target)
|
||
|
}
|
||
|
|
||
|
function moveToDestination (tmp, cache, sri, opts) {
|
||
|
const destination = contentPath(cache, sri)
|
||
|
const destDir = path.dirname(destination)
|
||
|
|
||
|
return fixOwner
|
||
|
.mkdirfix(cache, destDir)
|
||
|
.then(() => {
|
||
|
return moveFile(tmp.target, destination)
|
||
|
})
|
||
|
.then(() => {
|
||
|
tmp.moved = true
|
||
|
return fixOwner.chownr(cache, destination)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
function sizeError (expected, found) {
|
||
|
const err = new Error(`Bad data size: expected inserted data to be ${expected} bytes, but got ${found} instead`)
|
||
|
err.expected = expected
|
||
|
err.found = found
|
||
|
err.code = 'EBADSIZE'
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
function checksumError (expected, found) {
|
||
|
const err = new Error(`Integrity check failed:
|
||
|
Wanted: ${expected}
|
||
|
Found: ${found}`)
|
||
|
err.code = 'EINTEGRITY'
|
||
|
err.expected = expected
|
||
|
err.found = found
|
||
|
return err
|
||
|
}
|