changemaker.lite/termux-sms/termux-sms-api-server.py
bunker-admin 9321aeb263 Move SMS phone bridge from campaign_connector submodule into main repo
Consolidates the Termux SMS server code (previously in a separate
campaign_connector git submodule) into termux-sms/ at repo root.
Updates phone clone commands to use sparse checkout so only the
termux-sms/ directory is downloaded onto the Android device.

Bunker Admin
2026-03-31 11:04:14 -06:00

747 lines
26 KiB
Python

#!/usr/bin/env python3
"""
Termux SMS API Server
Bridges SMS Campaign Manager (Ubuntu) with Termux API (Android)
This server runs on the Android device in Termux and provides REST API
endpoints for the main SMS Campaign Manager to send messages using
native Android SMS capabilities instead of ADB automation.
All endpoints require API key authentication via X-API-Key header.
Localhost requests (127.0.0.1, ::1) are exempt for watchdog health checks.
"""
from flask import Flask, request, jsonify
import subprocess
import json
import time
import logging
import hmac
import hashlib
import os
from datetime import datetime
from typing import Dict, List, Optional, Any
# Configure logging — ensure log directory exists before creating FileHandler
_log_dir = os.path.expanduser('~/logs')
os.makedirs(_log_dir, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(_log_dir, 'sms-api.log')),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
app = Flask(__name__)
# Configuration
CONFIG = {
'SECRET_KEY': os.environ.get('SMS_API_SECRET') or os.environ.get('TERMUX_API_KEY', ''),
'MAX_MESSAGE_LENGTH': 1600, # Increased from 160 to support longer messages (SMS can be up to 1600 chars)
'RATE_LIMIT_DELAY': 1.0, # Reduced from 2.0 to 1.0 seconds between messages for faster campaigns
'ALLOWED_COMMANDS': [
'termux-sms-send',
'termux-sms-list',
'termux-battery-status',
'termux-location',
'termux-notification',
'termux-contact-list'
]
}
class SMSApiServer:
"""Main SMS API server class"""
def __init__(self):
self.last_send_time = 0
self.message_count = 0
self.start_time = time.time()
def authenticate_request(self, request_data: str, signature: str) -> bool:
"""Verify HMAC signature for request authentication"""
try:
expected_signature = hmac.new(
CONFIG['SECRET_KEY'].encode(),
request_data.encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected_signature)
except Exception as e:
logger.error(f"Authentication error: {e}")
return False
def execute_termux_command(self, command: List[str]) -> Dict[str, Any]:
"""Execute Termux API command with error handling"""
if not command or command[0] not in CONFIG['ALLOWED_COMMANDS']:
return {'success': False, 'error': 'Command not allowed'}
try:
logger.info(f"Executing: {' '.join(command)}")
result = subprocess.run(
command,
capture_output=True,
text=True,
timeout=30
)
return {
'success': result.returncode == 0,
'stdout': result.stdout.strip(),
'stderr': result.stderr.strip(),
'return_code': result.returncode
}
except subprocess.TimeoutExpired:
return {'success': False, 'error': 'Command timeout'}
except Exception as e:
return {'success': False, 'error': str(e)}
def get_sms_history(self, phone: Optional[str] = None, limit: int = 100) -> Dict[str, Any]:
"""Get SMS history for a specific phone number or all messages"""
try:
command = ['termux-sms-list']
if limit:
command.extend(['-l', str(limit)])
result = self.execute_termux_command(command)
if result['success'] and result['stdout']:
try:
messages = json.loads(result['stdout'])
# Filter by phone number if specified
if phone:
# Clean phone number for comparison
clean_phone = phone.replace('+', '').replace('-', '').replace(' ', '').replace('(', '').replace(')', '')
messages = [msg for msg in messages
if msg.get('number', '').replace('+', '').replace('-', '').replace(' ', '').replace('(', '').replace(')', '') == clean_phone]
return {
'success': True,
'messages': messages,
'count': len(messages)
}
except json.JSONDecodeError:
return {'success': False, 'error': 'Failed to parse SMS data'}
return {'success': False, 'error': 'Failed to retrieve SMS history'}
except Exception as e:
logger.error(f"Error getting SMS history: {e}")
return {'success': False, 'error': str(e)}
def get_contact_name(self, phone: str) -> Optional[str]:
"""Get contact name from phone's contact list"""
try:
# Use termux-contact-list command if available
result = self.execute_termux_command(['termux-contact-list'])
if result['success'] and result['stdout']:
try:
contacts = json.loads(result['stdout'])
clean_phone = phone.replace('+', '').replace('-', '').replace(' ', '').replace('(', '').replace(')', '')
for contact in contacts:
if 'phoneNumbers' in contact:
for phone_entry in contact['phoneNumbers']:
contact_phone = phone_entry.get('number', '').replace('+', '').replace('-', '').replace(' ', '').replace('(', '').replace(')', '')
if contact_phone == clean_phone:
return contact.get('name')
except json.JSONDecodeError:
pass
return None
except Exception as e:
logger.error(f"Error getting contact name for {phone}: {e}")
return None
def rate_limit_check(self) -> bool:
"""Check if enough time has passed since last message"""
current_time = time.time()
if current_time - self.last_send_time < CONFIG['RATE_LIMIT_DELAY']:
return False
self.last_send_time = current_time
return True
def send_sms(self, phone: str, message: str) -> Dict[str, Any]:
"""Send SMS using Termux API"""
# Input validation
if not phone or not message:
error_msg = 'Phone and message required'
logger.error(f"SMS validation failed: {error_msg}")
return {'success': False, 'error': error_msg}
# Clean phone number
clean_phone = phone.replace('+', '').replace('-', '').replace(' ', '').replace('(', '').replace(')', '')
if len(message) > CONFIG['MAX_MESSAGE_LENGTH']:
error_msg = f'Message too long ({len(message)} chars, max {CONFIG["MAX_MESSAGE_LENGTH"]})'
logger.error(f"SMS validation failed: {error_msg}")
return {'success': False, 'error': error_msg}
# Rate limiting
if not self.rate_limit_check():
error_msg = 'Rate limit exceeded, please wait'
logger.warning(f"SMS rate limited for {phone}")
return {'success': False, 'error': error_msg}
# Log the SMS attempt
logger.info(f"Attempting to send SMS to {phone} (length: {len(message)} chars)")
# Execute SMS send command
command = ['termux-sms-send', '-n', clean_phone, message]
result = self.execute_termux_command(command)
if result['success']:
self.message_count += 1
logger.info(f"SMS sent successfully to {phone}: {message[:50]}...")
# Send confirmation notification
self.execute_termux_command([
'termux-notification',
'--title', 'SMS Sent',
'--content', f'Message sent to {phone}'
])
else:
logger.error(f"SMS send failed to {phone}: {result.get('error', 'Unknown error')}")
return {
'success': result['success'],
'error': result.get('error') or result.get('stderr'),
'timestamp': datetime.now().isoformat(),
'phone': phone,
'message_length': len(message),
'total_sent': self.message_count
}
# Global server instance
sms_server = SMSApiServer()
def verify_api_key():
"""Verify API key from request headers"""
api_key = request.headers.get('X-API-Key') or request.headers.get('Authorization', '').replace('Bearer ', '')
expected_key = CONFIG['SECRET_KEY']
if not api_key:
return False
if not hmac.compare_digest(api_key, expected_key):
return False
return True
# ---------------------------------------------------------------------------
# Global auth: every request from a remote client must provide a valid API key.
# Localhost (127.0.0.1 / ::1) is exempt so the watchdog health check works.
# ---------------------------------------------------------------------------
@app.before_request
def require_api_key():
"""Enforce API key authentication on all endpoints for remote clients."""
if request.remote_addr in ('127.0.0.1', '::1'):
return None # allow localhost (watchdog health checks)
if not verify_api_key():
logger.warning(f"Auth failed: {request.method} {request.path} from {request.remote_addr}")
return jsonify({
'success': False,
'error': 'Authentication required',
'message': 'Please provide valid API key via X-API-Key header'
}), 401
return None
# API Endpoints
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
uptime = time.time() - sms_server.start_time
return jsonify({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'uptime_seconds': int(uptime),
'messages_sent': sms_server.message_count,
'version': '1.0.0'
})
@app.route('/api/sms/send', methods=['POST'])
def send_sms():
"""Send SMS message via Termux API"""
try:
data = request.get_json()
if not data:
logger.error("SMS send endpoint: No JSON data provided")
return jsonify({'success': False, 'error': 'JSON data required'}), 400
# Extract parameters
phone = data.get('phone', '').strip()
message = data.get('message', '').strip()
name = data.get('name', '')
logger.info(f"SMS send request: phone={phone}, name={name}, message_length={len(message) if message else 0}")
# Validate required fields
if not phone:
logger.error("SMS send validation: Missing phone number")
return jsonify({'success': False, 'error': 'Phone number required'}), 400
if not message:
logger.error("SMS send validation: Missing message")
return jsonify({'success': False, 'error': 'Message required'}), 400
# Message template substitution (like existing ui.sh)
if name and '{name}' in message:
message = message.replace('{name}', name)
# Send SMS
result = sms_server.send_sms(phone, message)
status_code = 200 if result['success'] else 400
logger.info(f"SMS send result: success={result['success']}, error={result.get('error', 'None')}")
return jsonify(result), status_code
except Exception as e:
logger.error(f"SMS send endpoint error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/sms/list', methods=['GET'])
def list_sms():
"""List SMS messages"""
try:
limit = request.args.get('limit', '10')
offset = request.args.get('offset', '0')
command = ['termux-sms-list', '-l', limit, '-o', offset]
result = sms_server.execute_termux_command(command)
if result['success']:
try:
sms_data = json.loads(result['stdout']) if result['stdout'] else []
return jsonify({
'success': True,
'messages': sms_data,
'count': len(sms_data)
})
except json.JSONDecodeError:
return jsonify({
'success': True,
'messages': result['stdout'],
'raw_output': True
})
else:
return jsonify({'success': False, 'error': result['error']}), 400
except Exception as e:
logger.error(f"SMS list error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/device/battery', methods=['GET'])
def get_battery_status():
"""Get device battery status"""
try:
result = sms_server.execute_termux_command(['termux-battery-status'])
if result['success']:
battery_data = json.loads(result['stdout'])
return jsonify({
'success': True,
'battery': battery_data,
'timestamp': datetime.now().isoformat()
})
else:
return jsonify({'success': False, 'error': result['error']}), 400
except Exception as e:
logger.error(f"Battery status error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/device/location', methods=['GET'])
def get_location():
"""Get device GPS location"""
try:
result = sms_server.execute_termux_command(['termux-location'])
if result['success'] and result['stdout']:
try:
location_data = json.loads(result['stdout'])
return jsonify({
'success': True,
'location': location_data,
'timestamp': datetime.now().isoformat()
})
except json.JSONDecodeError:
return jsonify({
'success': True,
'location': result['stdout'],
'raw_output': True
})
else:
return jsonify({'success': False, 'error': 'Location unavailable'}), 400
except Exception as e:
logger.error(f"Location error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/device/info', methods=['GET'])
def get_device_info():
"""Get general device information"""
try:
# Get multiple device stats
battery_result = sms_server.execute_termux_command(['termux-battery-status'])
info = {
'timestamp': datetime.now().isoformat(),
'uptime': time.time() - sms_server.start_time,
'messages_sent': sms_server.message_count,
'api_version': '1.0.0',
'termux_api_available': True
}
if battery_result['success']:
try:
battery_data = json.loads(battery_result['stdout'])
info['battery'] = battery_data
except json.JSONDecodeError:
pass
return jsonify({'success': True, 'device_info': info})
except Exception as e:
logger.error(f"Device info error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/sms/history', methods=['GET'])
def get_sms_history():
"""Get SMS history for conversation sync"""
try:
phone = request.args.get('phone')
limit = request.args.get('limit', 100, type=int)
result = sms_server.get_sms_history(phone, limit)
if result['success']:
return jsonify(result)
else:
return jsonify(result), 400
except Exception as e:
logger.error(f"SMS history error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/sms/inbox', methods=['GET'])
def get_sms_inbox():
"""Get SMS inbox messages (compatibility endpoint)"""
try:
since = request.args.get('since', type=int)
limit = request.args.get('limit', 100, type=int)
result = sms_server.get_sms_history(None, limit)
if result['success']:
messages = result['messages']
# Filter by timestamp if 'since' parameter provided
if since:
filtered_messages = []
for msg in messages:
# Convert received timestamp to compare with since
try:
msg_time = datetime.strptime(msg.get('received', ''), '%Y-%m-%d %H:%M:%S').timestamp()
if msg_time > since:
filtered_messages.append(msg)
except:
# If timestamp parsing fails, include the message
filtered_messages.append(msg)
messages = filtered_messages
return jsonify({
'success': True,
'messages': messages,
'count': len(messages)
})
else:
return jsonify(result), 400
except Exception as e:
logger.error(f"SMS inbox error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/contact/<phone>', methods=['GET'])
def get_contact_info(phone):
"""Get contact information for a phone number"""
try:
contact_name = sms_server.get_contact_name(phone)
return jsonify({
'success': True,
'phone': phone,
'name': contact_name,
'has_name': contact_name is not None
})
except Exception as e:
logger.error(f"Contact info error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/sms/send-reply', methods=['POST'])
def send_reply():
"""Send a reply message with enhanced tracking"""
try:
data = request.get_json()
if not data:
return jsonify({'success': False, 'error': 'No JSON data provided'}), 400
phone = data.get('phone')
message = data.get('message')
conversation_id = data.get('conversation_id')
if not phone or not message:
return jsonify({'success': False, 'error': 'Phone and message required'}), 400
# Rate limiting check
if not sms_server.rate_limit_check():
return jsonify({
'success': False,
'error': f'Rate limited. Wait {CONFIG["RATE_LIMIT_DELAY"]} seconds between messages'
}), 429
# Send via termux-sms-send
command = ['termux-sms-send', '-n', phone, message]
result = sms_server.execute_termux_command(command)
if result['success']:
sms_server.message_count += 1
return jsonify({
'success': True,
'message': 'Reply sent successfully',
'conversation_id': conversation_id,
'timestamp': datetime.now().isoformat(),
'phone': phone,
'message_sent': message
})
else:
return jsonify({
'success': False,
'error': f'Failed to send SMS: {result.get("stderr", "Unknown error")}'
}), 500
except Exception as e:
logger.error(f"Send reply error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/campaign/notify', methods=['POST'])
def campaign_notification():
"""Send notification about campaign status"""
try:
data = request.get_json()
title = data.get('title', 'SMS Campaign')
message = data.get('message', 'Campaign update')
result = sms_server.execute_termux_command([
'termux-notification',
'--title', title,
'--content', message
])
return jsonify({
'success': result['success'],
'error': result.get('error')
})
except Exception as e:
logger.error(f"Notification error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/logs/tail', methods=['GET'])
def tail_logs():
"""Get last N lines from the SMS API log file"""
try:
lines_param = request.args.get('lines', '100', type=str)
try:
num_lines = min(500, max(1, int(lines_param)))
except (ValueError, TypeError):
num_lines = 100
log_path = os.path.expanduser('~/logs/sms-api.log')
if not os.path.isfile(log_path):
return jsonify({
'success': True,
'lines': [],
'total_lines': 0,
'file_size': 0
})
file_size = os.path.getsize(log_path)
with open(log_path, 'r', encoding='utf-8', errors='replace') as f:
all_lines = f.readlines()
total_lines = len(all_lines)
tail_lines = [line.rstrip('\n') for line in all_lines[-num_lines:]]
return jsonify({
'success': True,
'lines': tail_lines,
'total_lines': total_lines,
'file_size': file_size
})
except Exception as e:
logger.error(f"Log tail error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/contacts/test', methods=['GET'])
def test_contacts():
"""Test endpoint to fetch and examine raw contact list structure"""
try:
logger.info("Testing termux-contact-list command...")
result = sms_server.execute_termux_command(['termux-contact-list'])
if not result['success']:
return jsonify({
'success': False,
'error': result.get('error', 'Unknown error'),
'stderr': result.get('stderr', ''),
'return_code': result.get('return_code')
}), 400
# Try to parse JSON
raw_output = result['stdout']
contacts_data = None
parse_error = None
try:
contacts_data = json.loads(raw_output)
except json.JSONDecodeError as e:
parse_error = str(e)
# Analyze structure
analysis = {
'total_contacts': len(contacts_data) if isinstance(contacts_data, list) else 'N/A',
'is_array': isinstance(contacts_data, list),
'sample_contact': None,
'all_fields': set()
}
if isinstance(contacts_data, list) and len(contacts_data) > 0:
# Get first contact as sample
analysis['sample_contact'] = contacts_data[0]
# Collect all unique field names across contacts
for contact in contacts_data[:10]: # Check first 10
if isinstance(contact, dict):
analysis['all_fields'].update(contact.keys())
analysis['all_fields'] = list(analysis['all_fields'])
return jsonify({
'success': True,
'raw_output': raw_output,
'parsed_data': contacts_data,
'parse_error': parse_error,
'analysis': analysis,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Contact test error: {e}")
return jsonify({
'success': False,
'error': str(e),
'traceback': str(e.__traceback__)
}), 500
@app.route('/api/contacts/list', methods=['GET'])
def list_contacts():
"""List all contacts from phone"""
try:
result = sms_server.execute_termux_command(['termux-contact-list'])
if result['success'] and result['stdout']:
try:
contacts = json.loads(result['stdout'])
# Optional filtering
search = request.args.get('search', '').lower()
if search:
contacts = [c for c in contacts
if search in str(c.get('name', '')).lower()
or search in str(c.get('number', '')).lower()]
return jsonify({
'success': True,
'contacts': contacts,
'count': len(contacts),
'timestamp': datetime.now().isoformat()
})
except json.JSONDecodeError as e:
return jsonify({
'success': False,
'error': f'Failed to parse contact data: {str(e)}',
'raw_output': result['stdout']
}), 500
else:
return jsonify({
'success': False,
'error': result.get('error', 'Failed to retrieve contacts'),
'stderr': result.get('stderr')
}), 400
except Exception as e:
logger.error(f"Contact list error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
if __name__ == '__main__':
# Validate required configuration
if not CONFIG['SECRET_KEY']:
logger.critical("SECURITY ERROR: SMS_API_SECRET or TERMUX_API_KEY environment variable is required!")
logger.critical("Set SMS_API_SECRET environment variable before starting the server")
logger.critical("Generate a secure key with: python -c \"import secrets; print(secrets.token_hex(32))\"")
print("\n" + "="*80)
print("FATAL ERROR: Missing required security configuration")
print("="*80)
print("SMS_API_SECRET environment variable must be set for authentication.")
print("This server cannot start without proper API key configuration.")
print("\nTo fix this:")
print("1. Generate a secure key: python -c \"import secrets; print(secrets.token_hex(32))\"")
print("2. Set environment variable: export SMS_API_SECRET='your-generated-key'")
print("3. Add to your shell profile or systemd service file")
print("="*80 + "\n")
exit(1)
logger.info("Starting Termux SMS API Server...")
logger.info("API authentication configured (all remote endpoints protected)")
logger.info(f"Available commands: {CONFIG['ALLOWED_COMMANDS']}")
# Get local IP for display (secure method without shell=True)
try:
import socket
# Get IP by connecting to external host (doesn't actually send data)
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
local_ip = s.getsockname()[0]
s.close()
except Exception as e:
logger.warning(f"Could not determine IP address: {e}")
local_ip = '0.0.0.0'
print(f"""
Termux SMS API Server Starting
Device IP: {local_ip}
API Base URL: http://{local_ip}:5001
All endpoints require X-API-Key header (except localhost)
Access from server:
curl -H "X-API-Key: $SMS_API_SECRET" http://{local_ip}:5001/health
""")
app.run(host='0.0.0.0', port=5001, debug=False)