Building Custom Workers
The built-in Video Service handles FFmpeg and HandBrake transcoding out of the box. For specialized processing needs—audio enhancement, proprietary tools, machine learning models, watermarking—you can build custom workers using the MediaMoth Go library.
Why Build Custom Workers?
Custom workers let you integrate any processing logic into your MediaMoth pipelines:
- Audio processing - Noise reduction, mastering, format conversion with specialized tools
- Image manipulation - Thumbnail generation, watermarking, metadata extraction
- AI/ML integration - Upscaling, object detection, content moderation, transcription
- Proprietary tools - Integrate your existing encoding or enhancement software
- Third-party APIs - Call external services for processing steps
- Custom logic - Implement domain-specific workflows unique to your use case
Once deployed, your custom worker automatically registers with the platform and appears as an available node type when designing pipelines.
How Workers Integrate
Workers communicate with MediaMoth through the job queue (Kafka) and register their capabilities with the Workflow Service. When you create a pipeline that includes a step matching your worker's type, MediaMoth automatically routes jobs to your worker instances.
The worker lifecycle:
- Worker starts and registers its capabilities (what types of jobs it can handle)
- Worker polls the job queue for matching work
- When a job arrives, the worker processes the media file
- Worker reports progress and results back to the platform
- MediaMoth triggers the next pipeline step automatically
Multiple instances of the same worker type can run simultaneously—MediaMoth distributes work across all available workers automatically.
Building a Worker
The process for creating a custom worker:
1. Import the MediaMoth Library
import (
"github.com/mediamoth/mediamoth/pkg/worker"
// Your processing dependencies
)2. Implement Your Processing Logic
Create a processor that implements the required interface. Your processor receives input files, performs transformations, and produces output files.
type MyProcessor struct {
// Your configuration
}
func (p *MyProcessor) Process(ctx context.Context, job *worker.Job) error {
// Read input file from job.InputPath
// Perform your processing
// Write output to job.OutputPath
// Report progress via job.UpdateProgress()
return nil
}3. Define Worker Capabilities
Specify what types of jobs your worker handles and what parameters it accepts:
capabilities := worker.Capabilities{
Type: "audio-enhancement",
SupportedFormats: []string{"mp3", "wav", "flac"},
Parameters: []worker.Parameter{
{Name: "noise_reduction", Type: "boolean", Default: true},
{Name: "normalize", Type: "boolean", Default: true},
{Name: "target_db", Type: "float", Default: -16.0},
},
}4. Start the Worker
func main() {
worker := worker.New(
&MyProcessor{},
capabilities,
worker.WithKafkaBrokers([]string{"localhost:9092"}),
worker.WithPostgresURL("postgres://..."),
)
worker.Start()
}5. Deploy and Scale
Package your worker as a Docker container and deploy it alongside your MediaMoth infrastructure. Point it at your shared Kafka and PostgreSQL instances via configuration.
To scale processing capacity, deploy more instances. Each instance will poll for work independently—MediaMoth handles load distribution automatically.
Worker Best Practices
- Handle errors gracefully - Report failures with clear error messages. MediaMoth will mark jobs as failed and track the error for debugging.
- Report progress - For long-running jobs, update progress periodically so users can monitor status in real-time.
- Validate inputs - Check that input files match expected formats before processing. Fail fast with clear error messages.
- Clean up temporary files - Remove intermediate files after processing to avoid filling disk space.
- Make processing idempotent - If a job fails and retries, ensure it can safely run again without corruption.
- Log thoroughly - Detailed logs help debug processing issues. MediaMoth aggregates worker logs for troubleshooting.
- Support graceful shutdown - Listen for shutdown signals and finish current jobs before exiting to avoid partial processing.
Example: Audio Enhancement Worker
Here's a conceptual example of an audio enhancement worker:
type AudioEnhancer struct {
config Config
}
func (a *AudioEnhancer) Process(ctx context.Context, job *worker.Job) error {
// Load audio file
audio, err := loadAudioFile(job.InputPath)
if err != nil {
return fmt.Errorf("failed to load audio: %w", err)
}
job.UpdateProgress(10) // Report initial progress
// Apply noise reduction if enabled
if job.Parameters["noise_reduction"].(bool) {
audio = applyNoiseReduction(audio)
job.UpdateProgress(40)
}
// Normalize audio if enabled
if job.Parameters["normalize"].(bool) {
targetDB := job.Parameters["target_db"].(float64)
audio = normalizeAudio(audio, targetDB)
job.UpdateProgress(70)
}
// Export processed audio
if err := exportAudioFile(audio, job.OutputPath); err != nil {
return fmt.Errorf("failed to export audio: %w", err)
}
job.UpdateProgress(100)
return nil
}Configuration and Environment
Workers need access to the same infrastructure as core MediaMoth services:
- Kafka brokers - For job queue communication
- PostgreSQL - For job state tracking
- Redis - For distributed coordination (if needed)
- Storage - Access to input/output file locations (local filesystem or shared storage)
Pass these via environment variables or configuration files:
KAFKA_BROKERS=kafka1:9092,kafka2:9092
POSTGRES_URL=postgres://user:pass@db:5432/mediamoth
REDIS_URL=redis://cache:6379
STORAGE_PATH=/mnt/mediaNext Steps
- API Reference - Detailed worker API documentation
- Service Nodes - Understanding the built-in processing nodes
- Pipeline Documentation - How workers fit into pipeline design
- Deployment Guide - Deploying workers in production