254 lines
9.1 KiB
Python
254 lines
9.1 KiB
Python
import logging
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
# Dynamically import settings based on MODE environment variable
|
|
mode = os.environ.get('MODE', 'DEVELOPMENT').upper()
|
|
if mode == 'PRODUCTION':
|
|
from settings_prod import settings
|
|
elif mode == 'LOCAL':
|
|
from settings_local import settings
|
|
else:
|
|
from settings_dev import settings
|
|
|
|
# Configure logging
|
|
def setup_logging():
|
|
"""Configure structured logging for the application."""
|
|
log_level = logging.DEBUG if mode != 'PRODUCTION' else logging.INFO
|
|
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
|
|
root_logger = logging.getLogger()
|
|
root_logger.setLevel(log_level)
|
|
root_logger.handlers.clear()
|
|
|
|
console_handler = logging.StreamHandler(sys.stdout)
|
|
console_handler.setLevel(log_level)
|
|
console_handler.setFormatter(formatter)
|
|
root_logger.addHandler(console_handler)
|
|
|
|
logging.getLogger('uvicorn.access').setLevel(logging.WARNING)
|
|
|
|
return logging.getLogger('eamco_voipms')
|
|
|
|
logger = setup_logging()
|
|
|
|
import uuid
|
|
import requests
|
|
from fastapi import FastAPI, HTTPException, status, Request
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from starlette.middleware.base import BaseHTTPMiddleware
|
|
from starlette.responses import JSONResponse
|
|
from .voipms_client import update_did_routing, get_forwardings
|
|
from .database import Session
|
|
from .models import Call
|
|
from sqlalchemy import text
|
|
|
|
def check_db_connection():
|
|
"""
|
|
Test database connectivity.
|
|
"""
|
|
try:
|
|
db = Session()
|
|
db.execute(text("SELECT 1"))
|
|
db.close()
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
# class AuthMiddleware(BaseHTTPMiddleware):
|
|
# async def dispatch(self, request, call_next):
|
|
# auth_header = request.headers.get('authorization')
|
|
# expected_token = f"Bearer {settings.voipms_api_token}"
|
|
# if auth_header != expected_token:
|
|
# return JSONResponse(status_code=401, content={"detail": "Unauthorized"})
|
|
# response = await call_next(request)
|
|
# return response
|
|
|
|
app = FastAPI(
|
|
title="EAMCO VoIP.ms Controller",
|
|
description="An API to manage routing for a VoIP.ms DID.",
|
|
version="1.0.0",
|
|
)
|
|
|
|
|
|
# Request ID middleware for request tracking/correlation
|
|
class RequestIDMiddleware(BaseHTTPMiddleware):
|
|
async def dispatch(self, request: Request, call_next):
|
|
request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())[:8]
|
|
request.state.request_id = request_id
|
|
response = await call_next(request)
|
|
response.headers["X-Request-ID"] = request_id
|
|
return response
|
|
|
|
app.add_middleware(RequestIDMiddleware)
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=settings.origins,
|
|
allow_credentials=True,
|
|
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
|
|
allow_headers=["Authorization", "Content-Type", "Accept", "Origin", "X-Requested-With", "X-Request-ID"],
|
|
)
|
|
|
|
@app.get("/", tags=["General"])
|
|
def read_root():
|
|
"""A simple root endpoint to confirm the API is running."""
|
|
return {"message": f"Welcome to the VoIP.ms API for DID: {settings.target_did}"}
|
|
|
|
# app.add_middleware(AuthMiddleware)
|
|
@app.get("/test/forwardings")
|
|
def test_get_forwardings():
|
|
try:
|
|
result = get_forwardings(phone_number=settings.target_cellphone_1)
|
|
return {
|
|
"message": f"Forwarding entry for {settings.target_cellphone_1}",
|
|
"voipms_response": result
|
|
}
|
|
except HTTPException as e:
|
|
raise e
|
|
|
|
|
|
@app.get("/test/did")
|
|
def test_did_info():
|
|
params = {
|
|
"api_username": settings.voipms_api_username,
|
|
"api_password": settings.voipms_api_password,
|
|
"method": "getDIDsInfo", # Correct: plural "DIDs"
|
|
"did": settings.target_did, # Filters to one DID
|
|
}
|
|
try:
|
|
response = requests.get(settings.voipms_api_url, params=params)
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
return {"error": f"Failed to connect to VoIP.ms API: {str(e)}", "params": params}
|
|
except Exception as e:
|
|
return {"error": f"Unexpected error: {str(e)}", "params": params}
|
|
|
|
@app.post("/route/main", status_code=status.HTTP_200_OK, tags=["DID Routing"])
|
|
def route_to_sip_account():
|
|
"""
|
|
Routes the target DID to the pre-configured SIP account.
|
|
"""
|
|
try:
|
|
# Use sub-account ID from TARGET_SIP_ACCOUNT
|
|
sip_account_id = '407323'
|
|
routing_string = f"account:{407323}" # e.g., 'account:407323_auburnoil'
|
|
result = update_did_routing(did=settings.target_did, routing=routing_string)
|
|
target_phone = sip_account_id
|
|
db = Session()
|
|
db.add(Call(current_phone=target_phone))
|
|
db.commit()
|
|
db.close()
|
|
return {
|
|
"message": f"Successfully routed DID {settings.target_did} to SIP account {settings.target_sip_account}",
|
|
"voipms_response": result
|
|
}
|
|
except HTTPException as e:
|
|
raise e
|
|
|
|
@app.post("/route/sip", status_code=status.HTTP_200_OK, tags=["DID Routing"])
|
|
def route_to_sip_account():
|
|
"""
|
|
Routes the target DID to the pre-configured SIP account.
|
|
"""
|
|
try:
|
|
# Use sub-account ID from TARGET_SIP_ACCOUNT
|
|
sip_account_id = settings.target_sip_account.split('@')[0] # Extract '407323_auburnoil'
|
|
routing_string = f"account:{sip_account_id}" # e.g., 'account:407323_auburnoil'
|
|
result = update_did_routing(did=settings.target_did, routing=routing_string)
|
|
target_phone = sip_account_id
|
|
db = Session()
|
|
db.add(Call(current_phone=target_phone))
|
|
db.commit()
|
|
db.close()
|
|
return {
|
|
"message": f"Successfully routed DID {settings.target_did} to SIP account {settings.target_sip_account}",
|
|
"voipms_response": result
|
|
}
|
|
except HTTPException as e:
|
|
raise e
|
|
|
|
|
|
@app.post("/route/cellphone1", status_code=status.HTTP_200_OK, tags=["DID Routing"])
|
|
def route_to_cellphone_1():
|
|
"""
|
|
Routes the target DID to the pre-configured Cellphone #1.
|
|
"""
|
|
try:
|
|
# Get the forwarding entry for the phone number
|
|
forwarding = get_forwardings(phone_number=settings.target_cellphone_1)
|
|
forwarding_id = forwarding.get("forwarding") # e.g., '998692'
|
|
if not forwarding_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"No forwarding ID found for phone number {settings.target_cellphone_1}"
|
|
)
|
|
routing_string = f"fwd:{forwarding_id}" # e.g., 'fwd:998692'
|
|
result = update_did_routing(did=settings.target_did, routing=routing_string)
|
|
target_phone = settings.target_cellphone_1
|
|
db = Session()
|
|
db.add(Call(current_phone=target_phone))
|
|
db.commit()
|
|
db.close()
|
|
return {
|
|
"message": f"Successfully routed DID {settings.target_did} to Cellphone #1 ({settings.target_cellphone_1})",
|
|
"voipms_response": result
|
|
}
|
|
except HTTPException as e:
|
|
raise e
|
|
|
|
@app.post("/route/cellphone2", status_code=status.HTTP_200_OK, tags=["DID Routing"])
|
|
def route_to_cellphone_2():
|
|
"""
|
|
Routes the target DID to the pre-configured Cellphone #2.
|
|
"""
|
|
try:
|
|
# Get the forwarding entry for the phone number
|
|
forwarding = get_forwardings(phone_number=settings.target_cellphone_2)
|
|
forwarding_id = forwarding.get("forwarding") # e.g., ID for 9143306100
|
|
if not forwarding_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"No forwarding ID found for phone number {settings.target_cellphone_2}"
|
|
)
|
|
routing_string = f"fwd:{forwarding_id}" # e.g., 'fwd:<ID>'
|
|
result = update_did_routing(did=settings.target_did, routing=routing_string)
|
|
target_phone = settings.target_cellphone_2
|
|
db = Session()
|
|
db.add(Call(current_phone=target_phone))
|
|
db.commit()
|
|
db.close()
|
|
return {
|
|
"message": f"Successfully routed DID {settings.target_did} to Cellphone #2 ({settings.target_cellphone_2})",
|
|
"voipms_response": result
|
|
}
|
|
except HTTPException as e:
|
|
raise e
|
|
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
"""Application startup - log configuration and test DB connection."""
|
|
logger.info("🚀 eamco_voipms STARTING")
|
|
if mode in ['DEVELOPMENT', 'DEV']:
|
|
logger.info("🤖🤖🤖🤖🤖 Mode: Development 🤖🤖🤖🤖🤖")
|
|
elif mode in ['PRODUCTION', 'PROD']:
|
|
logger.info("💀💀💀💀💀💀💀💀💀💀 ⚠️ WARNING PRODUCTION 💀💀💀💀💀💀💀💀💀💀")
|
|
# Sanitize DB URI to avoid logging credentials
|
|
db_uri = settings.SQLALCHEMY_DATABASE_URI
|
|
if '@' in db_uri:
|
|
db_uri = db_uri.split('@')[-1]
|
|
logger.info(f"DB: ...@{db_uri[:50]}")
|
|
logger.info(f"CORS: {len(settings.origins)} origins configured")
|
|
|
|
# Test database connection
|
|
if check_db_connection():
|
|
logger.info("DB Connection: ✅ OK")
|
|
else:
|
|
logger.info("DB Connection: ❌ FAILED")
|