#!/usr/bin/env python3 """ Termux SMS API Server Bridges SMS Campaign Manager (Ubuntu) with Termux API (Android) This server runs on the Android device in Termux and provides REST API endpoints for the main SMS Campaign Manager to send messages using native Android SMS capabilities instead of ADB automation. All endpoints require API key authentication via X-API-Key header. Localhost requests (127.0.0.1, ::1) are exempt for watchdog health checks. """ from flask import Flask, request, jsonify import subprocess import json import time import logging import hmac import hashlib import os from datetime import datetime from typing import Dict, List, Optional, Any # Configure logging — ensure log directory exists before creating FileHandler _log_dir = os.path.expanduser('~/logs') os.makedirs(_log_dir, exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(os.path.join(_log_dir, 'sms-api.log')), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) app = Flask(__name__) # Configuration CONFIG = { 'SECRET_KEY': os.environ.get('SMS_API_SECRET') or os.environ.get('TERMUX_API_KEY', ''), 'MAX_MESSAGE_LENGTH': 1600, # Increased from 160 to support longer messages (SMS can be up to 1600 chars) 'RATE_LIMIT_DELAY': 1.0, # Reduced from 2.0 to 1.0 seconds between messages for faster campaigns 'ALLOWED_COMMANDS': [ 'termux-sms-send', 'termux-sms-list', 'termux-battery-status', 'termux-location', 'termux-notification', 'termux-contact-list' ] } class SMSApiServer: """Main SMS API server class""" def __init__(self): self.last_send_time = 0 self.message_count = 0 self.start_time = time.time() def authenticate_request(self, request_data: str, signature: str) -> bool: """Verify HMAC signature for request authentication""" try: expected_signature = hmac.new( CONFIG['SECRET_KEY'].encode(), request_data.encode(), hashlib.sha256 ).hexdigest() return hmac.compare_digest(signature, expected_signature) except Exception as e: logger.error(f"Authentication error: {e}") return False def execute_termux_command(self, command: List[str]) -> Dict[str, Any]: """Execute Termux API command with error handling""" if not command or command[0] not in CONFIG['ALLOWED_COMMANDS']: return {'success': False, 'error': 'Command not allowed'} try: logger.info(f"Executing: {' '.join(command)}") result = subprocess.run( command, capture_output=True, text=True, timeout=30 ) return { 'success': result.returncode == 0, 'stdout': result.stdout.strip(), 'stderr': result.stderr.strip(), 'return_code': result.returncode } except subprocess.TimeoutExpired: return {'success': False, 'error': 'Command timeout'} except Exception as e: return {'success': False, 'error': str(e)} def get_sms_history(self, phone: Optional[str] = None, limit: int = 100) -> Dict[str, Any]: """Get SMS history for a specific phone number or all messages""" try: command = ['termux-sms-list'] if limit: command.extend(['-l', str(limit)]) result = self.execute_termux_command(command) if result['success'] and result['stdout']: try: messages = json.loads(result['stdout']) # Filter by phone number if specified if phone: # Clean phone number for comparison clean_phone = phone.replace('+', '').replace('-', '').replace(' ', '').replace('(', '').replace(')', '') messages = [msg for msg in messages if msg.get('number', '').replace('+', '').replace('-', '').replace(' ', '').replace('(', '').replace(')', '') == clean_phone] return { 'success': True, 'messages': messages, 'count': len(messages) } except json.JSONDecodeError: return {'success': False, 'error': 'Failed to parse SMS data'} return {'success': False, 'error': 'Failed to retrieve SMS history'} except Exception as e: logger.error(f"Error getting SMS history: {e}") return {'success': False, 'error': str(e)} def get_contact_name(self, phone: str) -> Optional[str]: """Get contact name from phone's contact list""" try: # Use termux-contact-list command if available result = self.execute_termux_command(['termux-contact-list']) if result['success'] and result['stdout']: try: contacts = json.loads(result['stdout']) clean_phone = phone.replace('+', '').replace('-', '').replace(' ', '').replace('(', '').replace(')', '') for contact in contacts: if 'phoneNumbers' in contact: for phone_entry in contact['phoneNumbers']: contact_phone = phone_entry.get('number', '').replace('+', '').replace('-', '').replace(' ', '').replace('(', '').replace(')', '') if contact_phone == clean_phone: return contact.get('name') except json.JSONDecodeError: pass return None except Exception as e: logger.error(f"Error getting contact name for {phone}: {e}") return None def rate_limit_check(self) -> bool: """Check if enough time has passed since last message""" current_time = time.time() if current_time - self.last_send_time < CONFIG['RATE_LIMIT_DELAY']: return False self.last_send_time = current_time return True def send_sms(self, phone: str, message: str) -> Dict[str, Any]: """Send SMS using Termux API""" # Input validation if not phone or not message: error_msg = 'Phone and message required' logger.error(f"SMS validation failed: {error_msg}") return {'success': False, 'error': error_msg} # Clean phone number clean_phone = phone.replace('+', '').replace('-', '').replace(' ', '').replace('(', '').replace(')', '') if len(message) > CONFIG['MAX_MESSAGE_LENGTH']: error_msg = f'Message too long ({len(message)} chars, max {CONFIG["MAX_MESSAGE_LENGTH"]})' logger.error(f"SMS validation failed: {error_msg}") return {'success': False, 'error': error_msg} # Rate limiting if not self.rate_limit_check(): error_msg = 'Rate limit exceeded, please wait' logger.warning(f"SMS rate limited for {phone}") return {'success': False, 'error': error_msg} # Log the SMS attempt logger.info(f"Attempting to send SMS to {phone} (length: {len(message)} chars)") # Execute SMS send command command = ['termux-sms-send', '-n', clean_phone, message] result = self.execute_termux_command(command) if result['success']: self.message_count += 1 logger.info(f"SMS sent successfully to {phone}: {message[:50]}...") # Send confirmation notification self.execute_termux_command([ 'termux-notification', '--title', 'SMS Sent', '--content', f'Message sent to {phone}' ]) else: logger.error(f"SMS send failed to {phone}: {result.get('error', 'Unknown error')}") return { 'success': result['success'], 'error': result.get('error') or result.get('stderr'), 'timestamp': datetime.now().isoformat(), 'phone': phone, 'message_length': len(message), 'total_sent': self.message_count } # Global server instance sms_server = SMSApiServer() def verify_api_key(): """Verify API key from request headers""" api_key = request.headers.get('X-API-Key') or request.headers.get('Authorization', '').replace('Bearer ', '') expected_key = CONFIG['SECRET_KEY'] if not api_key: return False if not hmac.compare_digest(api_key, expected_key): return False return True # --------------------------------------------------------------------------- # Global auth: every request from a remote client must provide a valid API key. # Localhost (127.0.0.1 / ::1) is exempt so the watchdog health check works. # --------------------------------------------------------------------------- @app.before_request def require_api_key(): """Enforce API key authentication on all endpoints for remote clients.""" if request.remote_addr in ('127.0.0.1', '::1'): return None # allow localhost (watchdog health checks) if not verify_api_key(): logger.warning(f"Auth failed: {request.method} {request.path} from {request.remote_addr}") return jsonify({ 'success': False, 'error': 'Authentication required', 'message': 'Please provide valid API key via X-API-Key header' }), 401 return None # API Endpoints @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint""" uptime = time.time() - sms_server.start_time return jsonify({ 'status': 'healthy', 'timestamp': datetime.now().isoformat(), 'uptime_seconds': int(uptime), 'messages_sent': sms_server.message_count, 'version': '1.0.0' }) @app.route('/api/sms/send', methods=['POST']) def send_sms(): """Send SMS message via Termux API""" try: data = request.get_json() if not data: logger.error("SMS send endpoint: No JSON data provided") return jsonify({'success': False, 'error': 'JSON data required'}), 400 # Extract parameters phone = data.get('phone', '').strip() message = data.get('message', '').strip() name = data.get('name', '') logger.info(f"SMS send request: phone={phone}, name={name}, message_length={len(message) if message else 0}") # Validate required fields if not phone: logger.error("SMS send validation: Missing phone number") return jsonify({'success': False, 'error': 'Phone number required'}), 400 if not message: logger.error("SMS send validation: Missing message") return jsonify({'success': False, 'error': 'Message required'}), 400 # Message template substitution (like existing ui.sh) if name and '{name}' in message: message = message.replace('{name}', name) # Send SMS result = sms_server.send_sms(phone, message) status_code = 200 if result['success'] else 400 logger.info(f"SMS send result: success={result['success']}, error={result.get('error', 'None')}") return jsonify(result), status_code except Exception as e: logger.error(f"SMS send endpoint error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/sms/list', methods=['GET']) def list_sms(): """List SMS messages""" try: limit = request.args.get('limit', '10') offset = request.args.get('offset', '0') command = ['termux-sms-list', '-l', limit, '-o', offset] result = sms_server.execute_termux_command(command) if result['success']: try: sms_data = json.loads(result['stdout']) if result['stdout'] else [] return jsonify({ 'success': True, 'messages': sms_data, 'count': len(sms_data) }) except json.JSONDecodeError: return jsonify({ 'success': True, 'messages': result['stdout'], 'raw_output': True }) else: return jsonify({'success': False, 'error': result['error']}), 400 except Exception as e: logger.error(f"SMS list error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/device/battery', methods=['GET']) def get_battery_status(): """Get device battery status""" try: result = sms_server.execute_termux_command(['termux-battery-status']) if result['success']: battery_data = json.loads(result['stdout']) return jsonify({ 'success': True, 'battery': battery_data, 'timestamp': datetime.now().isoformat() }) else: return jsonify({'success': False, 'error': result['error']}), 400 except Exception as e: logger.error(f"Battery status error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/device/location', methods=['GET']) def get_location(): """Get device GPS location""" try: result = sms_server.execute_termux_command(['termux-location']) if result['success'] and result['stdout']: try: location_data = json.loads(result['stdout']) return jsonify({ 'success': True, 'location': location_data, 'timestamp': datetime.now().isoformat() }) except json.JSONDecodeError: return jsonify({ 'success': True, 'location': result['stdout'], 'raw_output': True }) else: return jsonify({'success': False, 'error': 'Location unavailable'}), 400 except Exception as e: logger.error(f"Location error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/device/info', methods=['GET']) def get_device_info(): """Get general device information""" try: # Get multiple device stats battery_result = sms_server.execute_termux_command(['termux-battery-status']) info = { 'timestamp': datetime.now().isoformat(), 'uptime': time.time() - sms_server.start_time, 'messages_sent': sms_server.message_count, 'api_version': '1.0.0', 'termux_api_available': True } if battery_result['success']: try: battery_data = json.loads(battery_result['stdout']) info['battery'] = battery_data except json.JSONDecodeError: pass return jsonify({'success': True, 'device_info': info}) except Exception as e: logger.error(f"Device info error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/sms/history', methods=['GET']) def get_sms_history(): """Get SMS history for conversation sync""" try: phone = request.args.get('phone') limit = request.args.get('limit', 100, type=int) result = sms_server.get_sms_history(phone, limit) if result['success']: return jsonify(result) else: return jsonify(result), 400 except Exception as e: logger.error(f"SMS history error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/sms/inbox', methods=['GET']) def get_sms_inbox(): """Get SMS inbox messages (compatibility endpoint)""" try: since = request.args.get('since', type=int) limit = request.args.get('limit', 100, type=int) result = sms_server.get_sms_history(None, limit) if result['success']: messages = result['messages'] # Filter by timestamp if 'since' parameter provided if since: filtered_messages = [] for msg in messages: # Convert received timestamp to compare with since try: msg_time = datetime.strptime(msg.get('received', ''), '%Y-%m-%d %H:%M:%S').timestamp() if msg_time > since: filtered_messages.append(msg) except: # If timestamp parsing fails, include the message filtered_messages.append(msg) messages = filtered_messages return jsonify({ 'success': True, 'messages': messages, 'count': len(messages) }) else: return jsonify(result), 400 except Exception as e: logger.error(f"SMS inbox error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/contact/', methods=['GET']) def get_contact_info(phone): """Get contact information for a phone number""" try: contact_name = sms_server.get_contact_name(phone) return jsonify({ 'success': True, 'phone': phone, 'name': contact_name, 'has_name': contact_name is not None }) except Exception as e: logger.error(f"Contact info error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/sms/send-reply', methods=['POST']) def send_reply(): """Send a reply message with enhanced tracking""" try: data = request.get_json() if not data: return jsonify({'success': False, 'error': 'No JSON data provided'}), 400 phone = data.get('phone') message = data.get('message') conversation_id = data.get('conversation_id') if not phone or not message: return jsonify({'success': False, 'error': 'Phone and message required'}), 400 # Rate limiting check if not sms_server.rate_limit_check(): return jsonify({ 'success': False, 'error': f'Rate limited. Wait {CONFIG["RATE_LIMIT_DELAY"]} seconds between messages' }), 429 # Send via termux-sms-send command = ['termux-sms-send', '-n', phone, message] result = sms_server.execute_termux_command(command) if result['success']: sms_server.message_count += 1 return jsonify({ 'success': True, 'message': 'Reply sent successfully', 'conversation_id': conversation_id, 'timestamp': datetime.now().isoformat(), 'phone': phone, 'message_sent': message }) else: return jsonify({ 'success': False, 'error': f'Failed to send SMS: {result.get("stderr", "Unknown error")}' }), 500 except Exception as e: logger.error(f"Send reply error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/campaign/notify', methods=['POST']) def campaign_notification(): """Send notification about campaign status""" try: data = request.get_json() title = data.get('title', 'SMS Campaign') message = data.get('message', 'Campaign update') result = sms_server.execute_termux_command([ 'termux-notification', '--title', title, '--content', message ]) return jsonify({ 'success': result['success'], 'error': result.get('error') }) except Exception as e: logger.error(f"Notification error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/logs/tail', methods=['GET']) def tail_logs(): """Get last N lines from the SMS API log file""" try: lines_param = request.args.get('lines', '100', type=str) try: num_lines = min(500, max(1, int(lines_param))) except (ValueError, TypeError): num_lines = 100 log_path = os.path.expanduser('~/logs/sms-api.log') if not os.path.isfile(log_path): return jsonify({ 'success': True, 'lines': [], 'total_lines': 0, 'file_size': 0 }) file_size = os.path.getsize(log_path) with open(log_path, 'r', encoding='utf-8', errors='replace') as f: all_lines = f.readlines() total_lines = len(all_lines) tail_lines = [line.rstrip('\n') for line in all_lines[-num_lines:]] return jsonify({ 'success': True, 'lines': tail_lines, 'total_lines': total_lines, 'file_size': file_size }) except Exception as e: logger.error(f"Log tail error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/contacts/test', methods=['GET']) def test_contacts(): """Test endpoint to fetch and examine raw contact list structure""" try: logger.info("Testing termux-contact-list command...") result = sms_server.execute_termux_command(['termux-contact-list']) if not result['success']: return jsonify({ 'success': False, 'error': result.get('error', 'Unknown error'), 'stderr': result.get('stderr', ''), 'return_code': result.get('return_code') }), 400 # Try to parse JSON raw_output = result['stdout'] contacts_data = None parse_error = None try: contacts_data = json.loads(raw_output) except json.JSONDecodeError as e: parse_error = str(e) # Analyze structure analysis = { 'total_contacts': len(contacts_data) if isinstance(contacts_data, list) else 'N/A', 'is_array': isinstance(contacts_data, list), 'sample_contact': None, 'all_fields': set() } if isinstance(contacts_data, list) and len(contacts_data) > 0: # Get first contact as sample analysis['sample_contact'] = contacts_data[0] # Collect all unique field names across contacts for contact in contacts_data[:10]: # Check first 10 if isinstance(contact, dict): analysis['all_fields'].update(contact.keys()) analysis['all_fields'] = list(analysis['all_fields']) return jsonify({ 'success': True, 'raw_output': raw_output, 'parsed_data': contacts_data, 'parse_error': parse_error, 'analysis': analysis, 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"Contact test error: {e}") return jsonify({ 'success': False, 'error': str(e), 'traceback': str(e.__traceback__) }), 500 @app.route('/api/contacts/list', methods=['GET']) def list_contacts(): """List all contacts from phone""" try: result = sms_server.execute_termux_command(['termux-contact-list']) if result['success'] and result['stdout']: try: contacts = json.loads(result['stdout']) # Optional filtering search = request.args.get('search', '').lower() if search: contacts = [c for c in contacts if search in str(c.get('name', '')).lower() or search in str(c.get('number', '')).lower()] return jsonify({ 'success': True, 'contacts': contacts, 'count': len(contacts), 'timestamp': datetime.now().isoformat() }) except json.JSONDecodeError as e: return jsonify({ 'success': False, 'error': f'Failed to parse contact data: {str(e)}', 'raw_output': result['stdout'] }), 500 else: return jsonify({ 'success': False, 'error': result.get('error', 'Failed to retrieve contacts'), 'stderr': result.get('stderr') }), 400 except Exception as e: logger.error(f"Contact list error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 if __name__ == '__main__': # Validate required configuration if not CONFIG['SECRET_KEY']: logger.critical("SECURITY ERROR: SMS_API_SECRET or TERMUX_API_KEY environment variable is required!") logger.critical("Set SMS_API_SECRET environment variable before starting the server") logger.critical("Generate a secure key with: python -c \"import secrets; print(secrets.token_hex(32))\"") print("\n" + "="*80) print("FATAL ERROR: Missing required security configuration") print("="*80) print("SMS_API_SECRET environment variable must be set for authentication.") print("This server cannot start without proper API key configuration.") print("\nTo fix this:") print("1. Generate a secure key: python -c \"import secrets; print(secrets.token_hex(32))\"") print("2. Set environment variable: export SMS_API_SECRET='your-generated-key'") print("3. Add to your shell profile or systemd service file") print("="*80 + "\n") exit(1) logger.info("Starting Termux SMS API Server...") logger.info("API authentication configured (all remote endpoints protected)") logger.info(f"Available commands: {CONFIG['ALLOWED_COMMANDS']}") # Get local IP for display (secure method without shell=True) try: import socket # Get IP by connecting to external host (doesn't actually send data) s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) local_ip = s.getsockname()[0] s.close() except Exception as e: logger.warning(f"Could not determine IP address: {e}") local_ip = '0.0.0.0' print(f""" Termux SMS API Server Starting Device IP: {local_ip} API Base URL: http://{local_ip}:5001 All endpoints require X-API-Key header (except localhost) Access from server: curl -H "X-API-Key: $SMS_API_SECRET" http://{local_ip}:5001/health """) app.run(host='0.0.0.0', port=5001, debug=False)