KrakenHashes Agent Development Guide¶
Table of Contents¶
- Agent Architecture Overview
- Hardware Detection System
- Job Execution Flow
- WebSocket Communication
- File Synchronization
- Metrics Collection
- Adding New Features
- Testing Agents
Agent Architecture Overview¶
The KrakenHashes agent is a distributed compute node that executes password cracking jobs using hashcat. It's built with Go and designed to be cross-platform, supporting various GPU and CPU configurations.
Core Components¶
agent/
├── cmd/agent/ # Entry point and initialization
├── internal/
│ ├── agent/ # Core agent logic and WebSocket connection
│ ├── auth/ # API key and certificate management
│ ├── config/ # Configuration management
│ ├── hardware/ # GPU/CPU detection using hashcat
│ ├── jobs/ # Job execution and hashcat management
│ ├── metrics/ # System metrics collection
│ ├── sync/ # File synchronization with backend
│ └── status/ # Agent status management
└── pkg/debug/ # Debug logging utilities
Agent Lifecycle¶
// From cmd/agent/main.go
func main() {
// 1. Initialize debug logging
debug.Reinitialize()
// 2. Load configuration from .env
cfg := loadConfig()
// 3. Initialize data directories
dataDirs, err := config.GetDataDirs()
// 4. Create metrics collector
collector, err := metrics.New(metrics.Config{
CollectionInterval: time.Duration(cfg.heartbeatInterval) * time.Second,
EnableGPU: true,
})
// 5. Load or register agent credentials
agentID, cert, err := agent.LoadCredentials()
// 6. Create WebSocket connection
conn, err := agent.NewConnection(urlConfig)
// 7. Create job manager with hardware monitor
hwMonitor := conn.GetHardwareMonitor()
jobManager = jobs.NewJobManager(agentConfig, nil, hwMonitor)
// 8. Start connection and maintenance
conn.Start()
// 9. Detect and send device information
conn.DetectAndSendDevices()
}
Hardware Detection System¶
The agent uses hashcat's built-in device detection to identify available compute devices (GPUs and CPUs).
Device Detection Implementation¶
// From internal/hardware/hashcat_detector.go
type HashcatDetector struct {
binaryPath string
dataDirectory string
}
func (d *HashcatDetector) DetectDevices() (*types.DeviceDetectionResult, error) {
// Build hashcat command with device info flags
args := []string{"-I", "--machine-readable", "--quiet"}
cmd := exec.CommandContext(ctx, d.binaryPath, args...)
output, err := cmd.Output()
// Parse hashcat output
devices, backends, err := d.parseHashcatOutput(string(output))
return &types.DeviceDetectionResult{
Devices: devices,
OpenCLBackends: backends,
DetectedAt: time.Now(),
HashcatVersion: d.getHashcatVersion(),
}, nil
}
Device Structure¶
// From internal/hardware/types/device.go
type Device struct {
ID int `json:"id"`
Type string `json:"type"` // "gpu" or "cpu"
Brand string `json:"brand"` // "nvidia", "amd", "intel"
Name string `json:"name"`
Processor string `json:"processor"`
Memory int64 `json:"memory"` // In bytes
DriverVersion string `json:"driver_version"`
// Performance characteristics
PCIeBus string `json:"pcie_bus"`
CoreCount int `json:"core_count"`
ClockSpeed int `json:"clock_speed"` // In MHz
// Runtime state
Enabled bool `json:"enabled"`
}
Hardware Monitor¶
// From internal/hardware/monitor.go
type Monitor struct {
mu sync.RWMutex
devices []types.Device
hashcatDetector *HashcatDetector
}
// Detect devices using hashcat
func (m *Monitor) DetectDevices() (*types.DeviceDetectionResult, error) {
result, err := m.hashcatDetector.DetectDevices()
if err != nil {
return nil, err
}
// Store devices in monitor
m.mu.Lock()
m.devices = result.Devices
m.mu.Unlock()
return result, nil
}
// Get enabled device flags for hashcat (-d parameter)
func (m *Monitor) GetEnabledDeviceFlags() string {
m.mu.RLock()
defer m.mu.RUnlock()
return BuildDeviceFlags(m.devices)
}
Job Execution Flow¶
The agent executes hashcat jobs based on task assignments from the backend.
Job Manager¶
// From internal/jobs/jobs.go
type JobManager struct {
executor *HashcatExecutor
config *config.Config
progressCallback func(*JobProgress)
outputCallback func(taskID string, output string, isError bool)
fileSync *filesync.FileSync
hwMonitor HardwareMonitor
mutex sync.RWMutex
activeJobs map[string]*JobExecution
}
// Process a job assignment from the backend
func (jm *JobManager) ProcessJobAssignment(ctx context.Context, assignmentData []byte) error {
var assignment JobTaskAssignment
err := json.Unmarshal(assignmentData, &assignment)
// 1. Ensure hashlist is available
err = jm.ensureHashlist(ctx, &assignment)
// 2. Ensure rule chunks are available if needed
err = jm.ensureRuleChunks(ctx, &assignment)
// 3. Start job execution
process, err := jm.executor.ExecuteTask(ctx, &assignment)
// 4. Monitor job progress
go jm.monitorJobProgress(ctx, jobExecution)
return nil
}
Hashcat Executor¶
// From internal/jobs/hashcat_executor.go
type HashcatExecutor struct {
dataDirectory string
activeProcesses map[string]*HashcatProcess
mu sync.RWMutex
agentExtraParams string
deviceFlagsCallback func() string
}
// Execute a hashcat task
func (e *HashcatExecutor) ExecuteTask(ctx context.Context, assignment *JobTaskAssignment) (*HashcatProcess, error) {
// 1. Build hashcat command
args := e.buildHashcatCommand(assignment)
// 2. Create process with output capture
cmd := exec.CommandContext(ctx, binaryPath, args...)
// 3. Set up output pipes
stdout, _ := cmd.StdoutPipe()
stderr, _ := cmd.StderrPipe()
// 4. Start the process
err := cmd.Start()
// 5. Monitor output and parse progress
go e.monitorOutput(process, stdout, stderr)
return process, nil
}
// Build hashcat command with all parameters
func (e *HashcatExecutor) buildHashcatCommand(assignment *JobTaskAssignment) []string {
args := []string{
"-m", strconv.Itoa(assignment.HashType),
"-a", strconv.Itoa(assignment.AttackMode),
"--status",
"--status-json",
"--status-timer", strconv.Itoa(assignment.ReportInterval),
}
// Add device flags if callback is set
if e.deviceFlagsCallback != nil {
deviceFlags := e.deviceFlagsCallback()
if deviceFlags != "" {
args = append(args, "-d", deviceFlags)
}
}
// Add skip and limit for keyspace splitting
if assignment.KeyspaceStart > 0 {
args = append(args, "-s", strconv.FormatInt(assignment.KeyspaceStart, 10))
}
if assignment.KeyspaceEnd > assignment.KeyspaceStart {
limit := assignment.KeyspaceEnd - assignment.KeyspaceStart
args = append(args, "-l", strconv.FormatInt(limit, 10))
}
return args
}
Job Progress Tracking¶
// From internal/jobs/types.go
type JobProgress struct {
TaskID string `json:"task_id"`
Status string `json:"status"`
Progress float64 `json:"progress"`
Speed int64 `json:"speed"`
DeviceSpeeds []DeviceSpeed `json:"device_speeds"`
TimeRemaining int `json:"time_remaining"`
KeyspaceProcessed int64 `json:"keyspace_processed"`
CrackedCount int `json:"cracked_count"`
CrackedHashes []CrackedHash `json:"cracked_hashes,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
}
WebSocket Communication¶
The agent maintains a persistent WebSocket connection with the backend for real-time communication.
Connection Management¶
// From internal/agent/connection.go
type Connection struct {
ws *websocket.Conn
urlConfig *config.URLConfig
hwMonitor *hardware.Monitor
outbound chan *WSMessage
done chan struct{}
isConnected atomic.Bool
tlsConfig *tls.Config
fileSync *filesync.FileSync
jobManager JobManager
}
// WebSocket message types
type WSMessageType string
const (
WSTypeHardwareInfo WSMessageType = "hardware_info"
WSTypeMetrics WSMessageType = "metrics"
WSTypeHeartbeat WSMessageType = "heartbeat"
WSTypeAgentStatus WSMessageType = "agent_status"
WSTypeFileSyncRequest WSMessageType = "file_sync_request"
WSTypeFileSyncResponse WSMessageType = "file_sync_response"
WSTypeTaskAssignment WSMessageType = "task_assignment"
WSTypeJobProgress WSMessageType = "job_progress"
WSTypeJobStop WSMessageType = "job_stop"
WSTypeBenchmarkRequest WSMessageType = "benchmark_request"
WSTypeBenchmarkResult WSMessageType = "benchmark_result"
WSTypeHashcatOutput WSMessageType = "hashcat_output"
WSTypeDeviceDetection WSMessageType = "device_detection"
WSTypeDeviceUpdate WSMessageType = "device_update"
)
Message Handling¶
// From internal/agent/connection.go - readPump method
func (c *Connection) readPump() {
for {
var msg WSMessage
err := c.ws.ReadJSON(&msg)
switch msg.Type {
case WSTypeTaskAssignment:
// Process job assignment
if err := c.jobManager.ProcessJobAssignment(ctx, msg.Payload); err != nil {
debug.Error("Failed to process job assignment: %v", err)
}
case WSTypeFileSyncRequest:
// Handle file sync request
go c.handleFileSyncAsync(requestPayload)
case WSTypeBenchmarkRequest:
// Run speed test
go func() {
totalSpeed, deviceSpeeds, err := executor.RunSpeedTest(ctx, assignment, testDuration)
// Send results back
}()
case WSTypeDeviceUpdate:
// Update device enabled/disabled state
c.hwMonitor.UpdateDeviceStatus(updatePayload.DeviceID, updatePayload.Enabled)
}
}
}
Sending Updates¶
// Send job progress to backend
func (c *Connection) SendJobProgress(progress *jobs.JobProgress) error {
progressJSON, err := json.Marshal(progress)
msg := &WSMessage{
Type: WSTypeJobProgress,
Payload: progressJSON,
Timestamp: time.Now(),
}
select {
case c.outbound <- msg:
return nil
case <-time.After(5 * time.Second):
return fmt.Errorf("failed to queue job progress: channel blocked")
}
}
File Synchronization¶
The agent synchronizes wordlists, rules, and binaries with the backend.
File Sync Implementation¶
// From internal/sync/sync.go
type FileSync struct {
client *http.Client
urlConfig *config.URLConfig
dataDirs *config.DataDirs
sem chan struct{} // Semaphore for concurrent downloads
maxRetries int
apiKey string
agentID string
}
// Download a file from the backend
func (fs *FileSync) DownloadFileFromInfo(ctx context.Context, fileInfo *FileInfo) error {
// 1. Check if file already exists with correct hash
if fs.fileExistsWithHash(fileInfo) {
return nil
}
// 2. Build download URL
url := fs.buildDownloadURL(fileInfo)
// 3. Create authenticated request
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
req.Header.Set("X-API-Key", fs.apiKey)
req.Header.Set("X-Agent-ID", fs.agentID)
// 4. Download to temporary file
tempPath := finalPath + ".tmp"
err := fs.downloadToFile(req, tempPath)
// 5. Verify hash
if !fs.verifyFileHash(tempPath, fileInfo.MD5Hash) {
return fmt.Errorf("hash mismatch")
}
// 6. Move to final location
os.Rename(tempPath, finalPath)
// 7. Extract if it's a binary archive
if fileInfo.FileType == "binary" && strings.HasSuffix(fileInfo.Name, ".7z") {
fs.ExtractBinary7z(finalPath, targetDir)
}
return nil
}
Directory Structure¶
// From internal/config/dirs.go
type DataDirs struct {
Binaries string // /path/to/data/binaries
Wordlists string // /path/to/data/wordlists
Rules string // /path/to/data/rules
Hashlists string // /path/to/data/hashlists
}
// Wordlist categories:
// - general/ # Common wordlists
// - specialized/ # Domain-specific lists
// - targeted/ # Custom lists for specific targets
// - custom/ # User-uploaded lists
// Rule categories:
// - hashcat/ # Hashcat rule files
// - john/ # John the Ripper rules
// - custom/ # User-created rules
// - chunks/ # Split rule files for distributed processing
Metrics Collection¶
The agent collects system metrics for monitoring and optimization.
Metrics Collector¶
// From internal/metrics/collector.go
type Collector struct {
interval time.Duration
gpuEnabled bool
}
// Collect system metrics
func (c *Collector) Collect() (*SystemMetrics, error) {
metrics := &SystemMetrics{}
// CPU metrics using gopsutil
percentage, _ := cpu.Percent(time.Second, false)
metrics.CPUUsage = percentage[0]
// Memory metrics
vmem, _ := mem.VirtualMemory()
metrics.MemoryUsage = vmem.UsedPercent
// GPU metrics come from hashcat JSON status
// during job execution
return metrics, nil
}
Metrics Data Structure¶
// From internal/agent/connection.go
type MetricsData struct {
AgentID int `json:"agent_id"`
CollectedAt time.Time `json:"collected_at"`
CPUs []CPUMetrics `json:"cpus"`
GPUs []GPUMetrics `json:"gpus"`
Memory MemoryMetrics `json:"memory"`
Disk []DiskMetrics `json:"disk"`
Network []NetworkMetrics `json:"network"`
Process []ProcessMetrics `json:"process"`
}
Adding New Features¶
1. Adding a New WebSocket Message Type¶
// 1. Define the message type in connection.go
const WSTypeNewFeature WSMessageType = "new_feature"
// 2. Create payload structures
type NewFeatureRequest struct {
Parameter1 string `json:"parameter1"`
Parameter2 int `json:"parameter2"`
}
// 3. Add handler in readPump
case WSTypeNewFeature:
var payload NewFeatureRequest
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
debug.Error("Failed to parse new feature: %v", err)
continue
}
// Handle the feature
go c.handleNewFeature(payload)
// 4. Implement the handler
func (c *Connection) handleNewFeature(payload NewFeatureRequest) {
// Implementation
}
2. Adding Hardware Support¶
// 1. Update device detection in hashcat_detector.go
func (d *HashcatDetector) parseHashcatOutput(output string) ([]types.Device, []types.OpenCLBackend, error) {
// Add parsing for new hardware types
// Example: Detect new GPU vendor
if strings.Contains(line, "NewVendor") {
device.Brand = "newvendor"
device.Type = "gpu"
}
}
// 2. Add device-specific handling
func BuildDeviceFlags(devices []types.Device) string {
// Add logic for new device types
}
3. Adding Job Features¶
// 1. Update JobTaskAssignment structure
type JobTaskAssignment struct {
// Existing fields...
NewFeature string `json:"new_feature"`
}
// 2. Update hashcat command building
func (e *HashcatExecutor) buildHashcatCommand(assignment *JobTaskAssignment) []string {
// Add new hashcat parameters
if assignment.NewFeature != "" {
args = append(args, "--new-feature", assignment.NewFeature)
}
}
// 3. Update progress monitoring if needed
func (e *HashcatExecutor) parseHashcatStatus(status map[string]interface{}) *JobProgress {
// Parse new status fields
}
Testing Agents¶
Unit Testing¶
// Example from agent_test.go
func TestGetAgentID(t *testing.T) {
tests := []struct {
name string
setupFunc func(configDir string) error
expectedID int
wantErr bool
}{
{
name: "successful ID retrieval",
setupFunc: func(configDir string) error {
return auth.SaveAgentKey(configDir, "test-api-key", "456")
},
expectedID: 456,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tempDir := t.TempDir()
t.Setenv("KH_CONFIG_DIR", tempDir)
if tt.setupFunc != nil {
err := tt.setupFunc(tempDir)
require.NoError(t, err)
}
id, err := GetAgentID()
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.expectedID, id)
}
})
}
}
Integration Testing¶
// Test WebSocket connection
func TestWebSocketConnection(t *testing.T) {
// Create test server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{}
conn, _ := upgrader.Upgrade(w, r, nil)
defer conn.Close()
// Test message exchange
var msg WSMessage
conn.ReadJSON(&msg)
assert.Equal(t, WSTypeHeartbeat, msg.Type)
// Send response
conn.WriteJSON(WSMessage{
Type: WSTypeHeartbeat,
Timestamp: time.Now(),
})
}))
defer server.Close()
// Test connection
conn, err := NewConnection(urlConfig)
assert.NoError(t, err)
err = conn.Start()
assert.NoError(t, err)
}
Mock Testing¶
// From internal/mocks/
type MockJobManager struct {
mock.Mock
}
func (m *MockJobManager) ProcessJobAssignment(ctx context.Context, data []byte) error {
args := m.Called(ctx, data)
return args.Error(0)
}
// Use in tests
func TestJobProcessing(t *testing.T) {
mockJM := new(MockJobManager)
mockJM.On("ProcessJobAssignment", mock.Anything, mock.Anything).Return(nil)
conn := &Connection{
jobManager: mockJM,
}
// Test job processing
msg := WSMessage{
Type: WSTypeTaskAssignment,
Payload: json.RawMessage(`{"task_id": "test-123"}`),
}
// Process message
// Assert expectations
mockJM.AssertExpectations(t)
}
Performance Testing¶
func BenchmarkHashcatCommandBuilding(b *testing.B) {
executor := NewHashcatExecutor("/data")
assignment := &JobTaskAssignment{
TaskID: "bench-task",
HashType: 0,
AttackMode: 0,
WordlistPaths: []string{"wordlist1.txt", "wordlist2.txt"},
RulePaths: []string{"rule1.rule", "rule2.rule"},
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = executor.buildHashcatCommand(assignment)
}
}
Manual Testing¶
# Build and run agent locally
cd agent
go build -o krakenhashes-agent cmd/agent/main.go
# Run with test configuration
./krakenhashes-agent -host localhost:8080 -claim TEST-CLAIM-CODE -debug
# Test specific components
# Device detection
./krakenhashes-agent -test-devices
# File sync
./krakenhashes-agent -test-sync wordlists
# Benchmark
./krakenhashes-agent -test-benchmark -m 0 -a 0
Best Practices¶
-
Error Handling: Always use the debug package for logging
-
Resource Management: Always clean up resources
-
Concurrent Operations: Use proper synchronization
-
Context Usage: Respect context cancellation
-
Configuration: Use environment variables
Troubleshooting¶
Common issues and solutions:
- Certificate Errors: Agent will attempt to renew certificates automatically
- Connection Drops: Automatic reconnection with exponential backoff
- File Sync Failures: Automatic retry with hash verification
- Hashcat Errors: Check device permissions and driver installation
- Memory Issues: Monitor system resources during large jobs
For debugging, enable debug mode: