cloudflared-mirror/awsuploader/directory_upload_manager.go

107 lines
2.6 KiB
Go
Raw Permalink Normal View History

package awsuploader
import (
"os"
"path/filepath"
"time"
"github.com/cloudflare/cloudflared/logger"
)
// DirectoryUploadManager is used to manage file uploads on an interval from a directory
type DirectoryUploadManager struct {
logger logger.Service
uploader Uploader
rootDirectory string
sweepInterval time.Duration
ticker *time.Ticker
shutdownC chan struct{}
workQueue chan string
}
// NewDirectoryUploadManager create a new DirectoryUploadManager
// uploader is an Uploader to use as an actual uploading engine
// directory is the directory to sweep for files to upload
// sweepInterval is how often to iterate the directory and upload the files within
func NewDirectoryUploadManager(logger logger.Service, uploader Uploader, directory string, sweepInterval time.Duration, shutdownC chan struct{}) *DirectoryUploadManager {
workerCount := 10
manager := &DirectoryUploadManager{
logger: logger,
uploader: uploader,
rootDirectory: directory,
sweepInterval: sweepInterval,
shutdownC: shutdownC,
workQueue: make(chan string, workerCount),
}
//start workers
for i := 0; i < workerCount; i++ {
go manager.worker()
}
return manager
}
// Upload a file using the uploader
// This is useful for "out of band" uploads that need to be triggered immediately instead of waiting for the sweep
func (m *DirectoryUploadManager) Upload(filepath string) error {
return m.uploader.Upload(filepath)
}
// Start the upload ticker to walk the directories
func (m *DirectoryUploadManager) Start() {
m.ticker = time.NewTicker(m.sweepInterval)
go m.run()
}
func (m *DirectoryUploadManager) run() {
for {
select {
case <-m.shutdownC:
m.ticker.Stop()
return
case <-m.ticker.C:
m.sweep()
}
}
}
// sweep the directory and kick off uploads
func (m *DirectoryUploadManager) sweep() {
filepath.Walk(m.rootDirectory, func(path string, info os.FileInfo, err error) error {
if err != nil || info.IsDir() {
return nil
}
//30 days ago
retentionTime := 30 * (time.Hour * 24)
checkTime := time.Now().Add(-time.Duration(retentionTime))
//delete the file it is stale
if info.ModTime().Before(checkTime) {
os.Remove(path)
return nil
}
//add the upload to the work queue
go func() {
m.workQueue <- path
}()
return nil
})
}
// worker handles upload requests
func (m *DirectoryUploadManager) worker() {
for {
select {
case <-m.shutdownC:
return
case filepath := <-m.workQueue:
if err := m.Upload(filepath); err != nil {
m.logger.Errorf("Cannot upload file to s3 bucket: %s", err)
} else {
os.Remove(filepath)
}
}
}
}