| | import os |
| | import tempfile |
| | import numpy as np |
| | import cv2 |
| | from pathlib import Path |
| | import logging |
| | from transformers import DepthProImageProcessorFast, DepthProForDepthEstimation |
| | import torch |
| | from PIL import Image |
| | from fastapi import FastAPI, File, UploadFile, Form, HTTPException |
| | from fastapi.responses import JSONResponse, HTMLResponse |
| | from typing import Any, Dict, List, Tuple, Union |
| | import pillow_heif |
| | import json |
| |
|
| | from depth_pro.utils import load_rgb, extract_exif |
| |
|
| |
|
| | |
| | app = FastAPI( |
| | title="Depth Pro Distance Estimation", |
| | description="Estimate distance and depth using Apple's Depth Pro model", |
| | version="1.0.0", |
| | docs_url="/docs", |
| | redoc_url="/redoc" |
| | ) |
| |
|
| | |
| | device = 'cpu' |
| |
|
| | def initialize_depth_pipeline(): |
| | """Initialize the Depth Pro pipeline""" |
| | try: |
| | print("Initializing Depth Pro pipeline...") |
| | image_processor = DepthProImageProcessorFast.from_pretrained("apple/DepthPro-hf") |
| | model = DepthProForDepthEstimation.from_pretrained("apple/DepthPro-hf").to(device) |
| |
|
| | return model, image_processor |
| | except Exception as e: |
| | print(f"Error initializing pipeline: {e}") |
| | print("Falling back to dummy pipeline...") |
| | return None |
| |
|
| |
|
| | class DepthEstimator: |
| | def __init__(self, model=None, image_processor=None): |
| | self.device = torch.device('cpu') |
| | print("Initializing Depth Pro estimator...") |
| | self.model = model |
| | self.image_processor = image_processor |
| | print("Depth Pro estimator initialized successfully!") |
| |
|
| | def estimate_depth(self, image_path): |
| | try: |
| | |
| | image = Image.open(image_path) |
| | |
| | |
| | resized_image, new_size = self.resize_image(image_path) |
| |
|
| | rgb_image = load_rgb(resized_image.name) |
| | f_px = rgb_image[-1] |
| | eval_image = rgb_image[0] |
| | |
| | inputs = self.image_processor(eval_image, return_tensors="pt").to(self.device) |
| | with torch.no_grad(): |
| | outputs = self.model(**inputs) |
| | post_processed_output = self.image_processor.post_process_depth_estimation( |
| | outputs, target_sizes=[(new_size[1], new_size[0])], |
| | ) |
| | result = post_processed_output[0] |
| | field_of_view = result["field_of_view"] |
| | focal_length = result["focal_length"] |
| | depth = result["predicted_depth"] |
| |
|
| | |
| | if isinstance(depth, torch.Tensor): |
| | depth = depth.detach().cpu().numpy() |
| | elif not isinstance(depth, np.ndarray): |
| | depth = np.array(depth) |
| | |
| | |
| | print(f_px,focal_length) |
| |
|
| | |
| | return depth, new_size, focal_length |
| |
|
| | except Exception as e: |
| | print(f"Error in depth estimation: {e}") |
| | return None, None, None |
| | |
| | def resize_image(self, image_path, max_size=1536): |
| | with Image.open(image_path) as img: |
| | ratio = max_size / max(img.size) |
| | new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio)) |
| | img = img.resize(new_size, Image.Resampling.LANCZOS) |
| | with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_file: |
| | img.save(temp_file, format="PNG") |
| | return temp_file, new_size |
| | |
| |
|
| | def find_topmost_pixel(mask): |
| | '''Top Pixel from footpath mask''' |
| | footpath_pixels = np.where(mask > 0) |
| | if len(footpath_pixels[0]) == 0: |
| | return None |
| | min_y = np.min(footpath_pixels[0]) |
| | top_pixels_mask = footpath_pixels[0] == min_y |
| | top_x_coords = footpath_pixels[1][top_pixels_mask] |
| | center_idx = len(top_x_coords) // 2 |
| | return (min_y, top_x_coords[center_idx]) |
| |
|
| | def find_bottommost_footpath_pixel(mask, topmost_pixel): |
| | """Find the bottommost pixel perpendicular to the topmost pixel within the mask""" |
| | if topmost_pixel is None: |
| | return None |
| | |
| | top_y, top_x = topmost_pixel |
| | |
| | |
| | mask_y_coords, mask_x_coords = np.where(mask > 0) |
| | column_mask = mask_x_coords == top_x |
| | column_y_coords = mask_y_coords[column_mask] |
| | |
| | if len(column_y_coords) == 0: |
| | |
| | footpath_pixels = np.where(mask > 0) |
| | if len(footpath_pixels[0]) == 0: |
| | return None |
| | max_y = np.max(footpath_pixels[0]) |
| | bottom_pixels_mask = footpath_pixels[0] == max_y |
| | bottom_x_coords = footpath_pixels[1][bottom_pixels_mask] |
| | center_idx = len(bottom_x_coords) // 2 |
| | return (max_y, bottom_x_coords[center_idx]) |
| | |
| | |
| | max_y_in_column = np.max(column_y_coords) |
| | return (max_y_in_column, top_x) |
| |
|
| |
|
| | def estimate_real_world_distance(depth_map, topmost_pixel, mask): |
| | """Estimate real-world distance between two pixels using depth information""" |
| |
|
| | if topmost_pixel is None or depth_map is None: |
| | return None |
| | |
| | |
| | bottommost_pixel = find_bottommost_footpath_pixel(mask, topmost_pixel) |
| | |
| | if bottommost_pixel is None: |
| | return None |
| | |
| | top_y, top_x = topmost_pixel |
| | bottom_y, bottom_x = bottommost_pixel |
| | |
| | |
| | if (top_y >= depth_map.shape[0] or top_x >= depth_map.shape[1] or |
| | bottom_y >= depth_map.shape[0] or bottom_x >= depth_map.shape[1]): |
| | return None |
| | |
| | topmost_depth = depth_map[top_y, top_x] |
| | bottommost_depth = depth_map[bottom_y, bottom_x] |
| | |
| | |
| | if np.isnan(topmost_depth) or np.isnan(bottommost_depth): |
| | print("Invalid depth values (NaN) found") |
| | return None |
| | |
| | distance_meters = float(topmost_depth - bottommost_depth) |
| | |
| | print(f"Distance calculation:") |
| | print(f" Topmost pixel: ({top_y}, {top_x}) = {topmost_depth:.3f}m") |
| | print(f" Bottommost pixel: ({bottom_y}, {bottom_x}) = {bottommost_depth:.3f}m") |
| | print(f" Distance: {distance_meters:.3f}m") |
| | |
| | return distance_meters |
| |
|
| |
|
| |
|
| |
|
| |
|
| | |
| | print("Initializing Depth Pro pipeline...") |
| | depth_model, image_processor = initialize_depth_pipeline() |
| | depth_estimator = DepthEstimator(depth_model, image_processor) |
| |
|
| | @app.get("/health") |
| | async def health_check(): |
| | """Health check endpoint for Docker""" |
| | return {"status": "healthy", "service": "Depth Pro Distance Estimation"} |
| |
|
| | @app.get("/api") |
| | async def api_info(): |
| | """API information endpoint""" |
| | return { |
| | "message": "Depth Pro Distance Estimation API", |
| | "docs": "/docs", |
| | "health": "/health", |
| | "estimate_endpoint": "/estimate-depth" |
| | } |
| |
|
| | @app.post("/estimate-depth") |
| | async def estimate_depth_endpoint(file: UploadFile = File(...), mask: UploadFile = File(...)): |
| | """FastAPI endpoint for depth estimation and distance calculation""" |
| | try: |
| | |
| | with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_file: |
| | content = await file.read() |
| | temp_file.write(content) |
| | temp_file_path = temp_file.name |
| |
|
| | |
| | with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as mtemp_file: |
| | content = await mask.read() |
| | mtemp_file.write(content) |
| | temp_file_path_mask = mtemp_file.name |
| |
|
| | |
| | image = cv2.imread(temp_file_path) |
| | mask = cv2.imread(temp_file_path_mask) |
| | if image is None or mask is None: |
| | return JSONResponse( |
| | status_code=400, |
| | content={"error": "Could not load image or mask"} |
| | ) |
| | |
| | |
| | depth_map, new_size, focal_length_px = depth_estimator.estimate_depth(temp_file_path) |
| | |
| | if depth_map is None: |
| | return JSONResponse( |
| | status_code=500, |
| | content={"error": "Depth estimation failed"} |
| | ) |
| | |
| | |
| | resized_image = cv2.resize(image, new_size) |
| | resized_mask = cv2.resize(mask, new_size) |
| | |
| | |
| | if len(resized_mask.shape) == 3: |
| | resized_mask = cv2.cvtColor(resized_mask, cv2.COLOR_BGR2GRAY) |
| | |
| | |
| | topmost_pixel = find_topmost_pixel(resized_mask) |
| | |
| | |
| | distance_meters = estimate_real_world_distance(depth_map, topmost_pixel, resized_mask) |
| | |
| | |
| | os.unlink(temp_file_path) |
| | os.unlink(temp_file_path_mask) |
| | |
| | result = { |
| | "depth_map_shape": depth_map.shape, |
| | "focal_length_px": float(focal_length_px) if focal_length_px is not None else None, |
| | "topmost_pixel": [ int(topmost_pixel[0]), int(topmost_pixel[1])] if topmost_pixel else None, |
| | "distance_meters": distance_meters, |
| | "depth_stats": { |
| | "min_depth": float(np.min(depth_map)), |
| | "max_depth": float(np.max(depth_map)), |
| | "mean_depth": float(np.mean(depth_map)) |
| | } |
| | } |
| | |
| | return JSONResponse(content=result) |
| | |
| | except Exception as e: |
| | |
| | if 'temp_file_path' in locals(): |
| | try: |
| | os.unlink(temp_file_path) |
| | except: |
| | pass |
| | if 'temp_file_path_mask' in locals(): |
| | try: |
| | os.unlink(temp_file_path_mask) |
| | except: |
| | pass |
| | return JSONResponse( |
| | status_code=500, |
| | content={"error": str(e)} |
| | ) |
| |
|
| | @app.get("/", response_class=HTMLResponse) |
| | async def root(): |
| | """Root endpoint with simple HTML interface""" |
| | html_content = """ |
| | <!DOCTYPE html> |
| | <html> |
| | <head> |
| | <title>Depth Pro Distance Estimation</title> |
| | <style> |
| | body { |
| | font-family: Arial, sans-serif; |
| | max-width: 800px; |
| | margin: 0 auto; |
| | padding: 20px; |
| | background-color: #f5f5f5; |
| | } |
| | .container { |
| | background-color: white; |
| | padding: 30px; |
| | border-radius: 10px; |
| | box-shadow: 0 2px 10px rgba(0,0,0,0.1); |
| | } |
| | h1 { |
| | color: #2c3e50; |
| | text-align: center; |
| | margin-bottom: 10px; |
| | } |
| | .subtitle { |
| | text-align: center; |
| | color: #7f8c8d; |
| | margin-bottom: 30px; |
| | } |
| | .upload-section { |
| | border: 2px dashed #3498db; |
| | border-radius: 10px; |
| | padding: 30px; |
| | text-align: center; |
| | margin: 20px 0; |
| | background-color: #ecf0f1; |
| | } |
| | input[type="file"] { |
| | margin: 10px 0; |
| | padding: 10px; |
| | border: 1px solid #bdc3c7; |
| | border-radius: 5px; |
| | } |
| | .file-group { |
| | margin: 20px 0; |
| | } |
| | .file-label { |
| | display: block; |
| | margin-bottom: 8px; |
| | font-weight: bold; |
| | color: #2c3e50; |
| | } |
| | button { |
| | background-color: #3498db; |
| | color: white; |
| | padding: 12px 25px; |
| | border: none; |
| | border-radius: 5px; |
| | cursor: pointer; |
| | font-size: 16px; |
| | } |
| | button:hover { |
| | background-color: #2980b9; |
| | } |
| | .results { |
| | margin-top: 20px; |
| | padding: 20px; |
| | border-radius: 5px; |
| | background-color: #e8f5e8; |
| | display: none; |
| | } |
| | .error { |
| | background-color: #ffeaa7; |
| | border-left: 4px solid #fdcb6e; |
| | padding: 10px; |
| | margin: 10px 0; |
| | } |
| | .endpoint-info { |
| | background-color: #74b9ff; |
| | color: white; |
| | padding: 15px; |
| | border-radius: 5px; |
| | margin: 20px 0; |
| | } |
| | .feature { |
| | margin: 10px 0; |
| | padding: 10px; |
| | border-left: 3px solid #3498db; |
| | background-color: #f8f9fa; |
| | } |
| | </style> |
| | </head> |
| | <body> |
| | <div class="container"> |
| | <h1>π Depth Pro Distance Estimation</h1> |
| | <p class="subtitle">Upload an image and a footpath mask to estimate depth and calculate distances using Apple's Depth Pro model</p> |
| | |
| | <div class="upload-section"> |
| | <h3>Upload Image and Mask</h3> |
| | <form id="uploadForm" enctype="multipart/form-data"> |
| | <div style="margin: 20px 0;"> |
| | <label for="imageFile" style="display: block; margin-bottom: 5px; font-weight: bold;">πΈ Main Image:</label> |
| | <input type="file" id="imageFile" name="file" accept="image/*" required style="width: 100%;"> |
| | </div> |
| | <div style="margin: 20px 0;"> |
| | <label for="maskFile" style="display: block; margin-bottom: 5px; font-weight: bold;">π Footpath Mask:</label> |
| | <input type="file" id="maskFile" name="mask" accept="image/*" required style="width: 100%;"> |
| | </div> |
| | <button type="submit">Analyze Image with Mask</button> |
| | </form> |
| | |
| | <div id="results" class="results"> |
| | <h3>Analysis Results:</h3> |
| | <div id="resultsContent"></div> |
| | </div> |
| | </div> |
| | |
| | <div class="endpoint-info"> |
| | <h3>π API Endpoints</h3> |
| | <p><strong>POST /estimate-depth</strong> - Upload image and footpath mask for depth estimation</p> |
| | <p><strong>GET /docs</strong> - API documentation</p> |
| | <p><strong>GET /health</strong> - Health check</p> |
| | </div> |
| | |
| | <div class="feature"> |
| | <h3>β¨ Features</h3> |
| | <ul> |
| | <li>π― Monocular depth estimation using Depth Pro</li> |
| | <li>π Footpath mask-based analysis</li> |
| | <li>π Real-world distance calculation between mask boundaries</li> |
| | <li>π₯οΈ CPU-optimized processing</li> |
| | <li>π Fast inference suitable for real-time use</li> |
| | </ul> |
| | </div> |
| | </div> |
| | |
| | <script> |
| | document.getElementById('uploadForm').addEventListener('submit', async function(e) { |
| | e.preventDefault(); |
| | |
| | const fileInput = document.getElementById('imageFile'); |
| | const maskInput = document.getElementById('maskFile'); |
| | const resultsDiv = document.getElementById('results'); |
| | const resultsContent = document.getElementById('resultsContent'); |
| | |
| | if (!fileInput.files[0]) { |
| | alert('Please select a main image file'); |
| | return; |
| | } |
| | |
| | if (!maskInput.files[0]) { |
| | alert('Please select a footpath mask file'); |
| | return; |
| | } |
| | |
| | const formData = new FormData(); |
| | formData.append('file', fileInput.files[0]); |
| | formData.append('mask', maskInput.files[0]); |
| | |
| | try { |
| | resultsContent.innerHTML = '<p>π Processing image and mask...</p>'; |
| | resultsDiv.style.display = 'block'; |
| | |
| | const response = await fetch('/estimate-depth', { |
| | method: 'POST', |
| | body: formData |
| | }); |
| | |
| | if (response.ok) { |
| | const result = await response.json(); |
| | |
| | let html = '<h4>π Results:</h4>'; |
| | html += `<p><strong>π Distance:</strong> ${result.distance_meters ? result.distance_meters.toFixed(3) + ' meters' : 'N/A'}</p>`; |
| | html += `<p><strong>π― Focal Length:</strong> ${result.focal_length_px ? result.focal_length_px.toFixed(2) + ' pixels' : 'N/A'}</p>`; |
| | html += `<p><strong>π Depth Map Shape:</strong> ${result.depth_map_shape ? result.depth_map_shape.join(' x ') : 'N/A'}</p>`; |
| | html += `<p><strong>π Top Mask Pixel:</strong> ${result.topmost_pixel ? `(${result.topmost_pixel[0]}, ${result.topmost_pixel[1]})` : 'N/A'}</p>`; |
| | |
| | if (result.depth_stats) { |
| | html += '<h4>π Depth Statistics:</h4>'; |
| | html += `<p><strong>Min Depth:</strong> ${result.depth_stats.min_depth.toFixed(3)}m</p>`; |
| | html += `<p><strong>Max Depth:</strong> ${result.depth_stats.max_depth.toFixed(3)}m</p>`; |
| | html += `<p><strong>Mean Depth:</strong> ${result.depth_stats.mean_depth.toFixed(3)}m</p>`; |
| | } |
| | |
| | resultsContent.innerHTML = html; |
| | } else { |
| | const error = await response.json(); |
| | resultsContent.innerHTML = `<div class="error">β Error: ${error.error || 'Processing failed'}</div>`; |
| | } |
| | } catch (error) { |
| | resultsContent.innerHTML = `<div class="error">β Network error: ${error.message}</div>`; |
| | } |
| | }); |
| | </script> |
| | </body> |
| | </html> |
| | """ |
| | return HTMLResponse(content=html_content) |
| |
|
| |
|
| | |
| | if __name__ == "__main__": |
| | import uvicorn |
| | uvicorn.run( |
| | app, |
| | host="0.0.0.0", |
| | port=7860, |
| | log_level="info", |
| | access_log=True |
| | ) |
| |
|