Building a Geo-Aware Rate Limiter with Redis and IP Intelligence
Static rate limiting (e.g., 100 requests/minute per IP) is a blunt instrument. It often penalizes legitimate power users behind NATs while failing to stop low-and-slow attacks from distributed botnets.
A superior approach is Geo-Aware Rate Limiting. By integrating IP intelligence, you can apply variable throttling policies based on the request's origin context: geolocation, ASN, and connection type (VPN, Tor, Proxy).
This guide demonstrates how to build a dynamic rate limiter using Python, Redis, and the IPASIS API.
The Architecture
The logic flow moves from a static threshold to a tiered risk assessment:
- Ingress: Request hits the middleware.
- Enrichment: Query IPASIS for metadata (Country, ISP, Proxy Status).
- Tier Assignment: Map metadata to a policy tier (e.g., Trusted, Standard, Suspicious).
- Throttling: Check Redis for the tier-specific limit.
Policy Tiers Example
- Tier 1 (Trusted): Domestic Residential IPs. Limit:
200 req/min. - Tier 2 (Standard): RoW (Rest of World) Residential. Limit:
50 req/min. - Tier 3 (High Risk): Detected VPNs, Datacenters, or High-Risk Countries. Limit:
5 req/min.
Implementation
We will use Python with redis-py for the token bucket algorithm and requests to fetch IP metadata.
Prerequisites:
- Redis instance
- IPASIS API Key
Step 1: The IP Intelligence Wrapper
First, implement a caching wrapper for the IP lookup to minimize latency. IP metadata is static for short durations; caching it for 1-6 hours is standard practice.
import requests
import json
import redis
# Configuration
IPASIS_API_KEY = 'your_api_key'
REDIS_HOST = 'localhost'
CACHE_TTL = 3600 # 1 hour cache for IP data
r = redis.Redis(host=REDIS_HOST, port=6379, db=0)
def get_ip_intelligence(ip_address):
# Check local cache first
cache_key = f"ip_meta:{ip_address}"
cached_data = r.get(cache_key)
if cached_data:
return json.loads(cached_data)
# Fetch from IPASIS
try:
response = requests.get(f"https://api.ipasis.com/v1/{ip_address}?key={IPASIS_API_KEY}")
if response.status_code == 200:
data = response.json()
# Cache the result
r.setex(cache_key, CACHE_TTL, json.dumps(data))
return data
except Exception as e:
# Fallback open or closed depending on security posture
print(f"Lookup failed: {e}")
return None
Step 2: Determining Limits
Define the logic that assigns rate limits based on the enriched data.
def determine_limit(ip_data):
if not ip_data:
return 20 # Fallback safe limit
# Tier 3: High Risk (VPNs, Proxies, Tor)
if ip_data.get('security', {}).get('is_vpn') or ip_data.get('security', {}).get('is_proxy'):
return 5
country = ip_data.get('location', {}).get('country_code')
# Tier 1: Primary Market (e.g., US, CA, UK)
target_markets = ['US', 'CA', 'GB']
if country in target_markets:
return 200
# Tier 2: Standard Global Traffic
return 50
Step 3: Redis Rate Limiter (Fixed Window)
Execute the check using atomic Redis operations.
def check_rate_limit(ip_address):
# 1. Get Intelligence
ip_data = get_ip_intelligence(ip_address)
# 2. Define Limit
limit = determine_limit(ip_data)
window_seconds = 60
# 3. Redis Atomic Increment
bucket = f"rate_limit:{ip_address}"
# Pipeline ensures atomicity of increment and expire
pipe = r.pipeline()
pipe.incr(bucket)
pipe.expire(bucket, window_seconds)
result = pipe.execute()
current_count = result[0]
if current_count > limit:
return False, f"Rate limit exceeded. Tier limit: {limit}"
return True, "Allowed"
# Simulation
client_ip = "1.2.3.4"
is_allowed, message = check_rate_limit(client_ip)
print(f"Request status: {message}")
Performance Considerations
- Asynchronous Lookup: In production (Node.js/Go/FastAPI), the IP lookup should be non-blocking. If the IP is not in the local Redis cache, consider allowing the first request while fetching metadata in the background to prevent latency spikes on the first hit.
- Fail-Open vs. Fail-Closed: If the IP intelligence API times out, default to a strict "Safe Tier" (e.g., 20 req/min) rather than blocking traffic entirely.
- ASN Blocking: For B2B applications, you may want to whitelist specific ASNs (e.g., Google Bot, Salesforce) regardless of rate limits.
FAQ
Q: Does this add significant latency? By caching the IP metadata in Redis (or local memory) for 1+ hours, the API lookup cost is amortized across thousands of requests. The latency impact on cached hits is sub-millisecond.
Q: How do I handle IPv6?
Rate limiting single IPv6 addresses is ineffective due to the vast address space. Always aggregate IPv6 addresses by their /64 subnet range when generating the Redis key.
Q: Can I use this for login protection?
Yes. Geo-aware limiting is excellent for credential stuffing protection. You can enforce a limit of 3 failed logins/hour for foreign VPNs while allowing 10 failed logins/hour for domestic residential IPs.
Secure Your Perimeter with IPASIS
Sophisticated attacks require sophisticated defenses. Don't rely on flat rate limits that block customers and admit bots.
Integrate IPASIS today to detect proxies, VPNs, and high-risk locations with millisecond precision.
Get your free API Key and start building smarter defenses.