enrichpythonone-off✓ IRON 90hand-curated
Shodan → Microsoft Sentinel
Enrich IPs from a Sentinel watchlist with Shodan host data
One-off run: reads a list of IPs from a Sentinel watchlist via the Logs Query API, queries Shodan for host details on each, and writes results back to a custom log table.
shodansentinelipenrichmentone-offgenericmoderate
# ============================================================
# RINOX INTEGRATION: Shodan -> Microsoft Sentinel (one-off enrichment)
# Generated by Rinox (rinox.io)
# Use Case: Enrich Sentinel watchlist IPs with Shodan host data
# Language: python
# ============================================================
# ============================================================
# SECTION 1: LOGGING
# ============================================================
import logging
import os
import sys
import json
import time
from datetime import datetime, timezone
import requests
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", "INFO"),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
log = logging.getLogger("rinox.shodan_to_sentinel")
# ============================================================
# SECTION 2: AUTHENTICATION
# ============================================================
SHODAN_API_KEY = os.environ.get("SHODAN_API_KEY", "")
AZURE_TENANT_ID = os.environ.get("AZURE_TENANT_ID", "")
AZURE_CLIENT_ID = os.environ.get("AZURE_CLIENT_ID", "")
AZURE_CLIENT_SECRET = os.environ.get("AZURE_CLIENT_SECRET", "")
SENTINEL_WORKSPACE_ID = os.environ.get("SENTINEL_WORKSPACE_ID", "")
SENTINEL_WATCHLIST = os.environ.get("SENTINEL_WATCHLIST_NAME", "")
SENTINEL_IP_COLUMN = os.environ.get("SENTINEL_IP_COLUMN", "IPAddress")
SENTINEL_DCE = os.environ.get("SENTINEL_DCE_URL", "").rstrip("/")
SENTINEL_DCR_ID = os.environ.get("SENTINEL_DCR_IMMUTABLE_ID", "")
SENTINEL_STREAM = os.environ.get("SENTINEL_STREAM_NAME", "Custom-ShodanHostEnrichment_CL")
RATE_LIMIT_SLEEP = float(os.environ.get("RATE_LIMIT_SLEEP", "1.0"))
REQUIRED = {
"SHODAN_API_KEY": SHODAN_API_KEY,
"AZURE_TENANT_ID": AZURE_TENANT_ID, "AZURE_CLIENT_ID": AZURE_CLIENT_ID,
"AZURE_CLIENT_SECRET": AZURE_CLIENT_SECRET,
"SENTINEL_WORKSPACE_ID": SENTINEL_WORKSPACE_ID,
"SENTINEL_WATCHLIST_NAME": SENTINEL_WATCHLIST,
"SENTINEL_DCE_URL": SENTINEL_DCE, "SENTINEL_DCR_IMMUTABLE_ID": SENTINEL_DCR_ID,
}
missing = [k for k, v in REQUIRED.items() if not v]
if missing:
log.error("Missing env vars: %s", ", ".join(missing))
sys.exit(1)
def azure_token(scope):
resp = requests.post(
f"https://login.microsoftonline.com/{AZURE_TENANT_ID}/oauth2/v2.0/token",
data={"grant_type": "client_credentials", "client_id": AZURE_CLIENT_ID,
"client_secret": AZURE_CLIENT_SECRET, "scope": scope},
timeout=30,
)
resp.raise_for_status()
return resp.json()["access_token"]
# ============================================================
# SECTION 3: SOURCE SYSTEM CALLS (Sentinel watchlist + Shodan)
# ============================================================
def fetch_watchlist_ips():
token = azure_token("https://api.loganalytics.io/.default")
kql = f"_GetWatchlist('{SENTINEL_WATCHLIST}') | project {SENTINEL_IP_COLUMN}"
resp = requests.post(
f"https://api.loganalytics.io/v1/workspaces/{SENTINEL_WORKSPACE_ID}/query",
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
json={"query": kql}, timeout=60,
)
resp.raise_for_status()
body = resp.json()
rows = body.get("tables", [{}])[0].get("rows", [])
return [r[0] for r in rows if r and r[0]]
def shodan_lookup(ip):
resp = requests.get(
f"https://api.shodan.io/shodan/host/{ip}",
params={"key": SHODAN_API_KEY},
timeout=30,
)
return resp.status_code, resp.json() if resp.headers.get("content-type", "").startswith("application/json") else {}
# ============================================================
# SECTION 4: TRANSLATION
# ============================================================
def translate(ip, shodan_data):
return {
"TimeGenerated": datetime.now(timezone.utc).isoformat(),
"IPAddress": ip,
"Country": shodan_data.get("country_name"),
"ASN": shodan_data.get("asn"),
"Org": shodan_data.get("org"),
"OS": shodan_data.get("os"),
"Ports": shodan_data.get("ports") or [],
"Hostnames": shodan_data.get("hostnames") or [],
"Vulns": list((shodan_data.get("vulns") or {}).keys()) if isinstance(shodan_data.get("vulns"), dict) else (shodan_data.get("vulns") or []),
"LastUpdate": shodan_data.get("last_update"),
"Tags": shodan_data.get("tags") or [],
}
# ============================================================
# SECTION 5: TARGET SYSTEM CALLS (Sentinel Logs Ingestion)
# ============================================================
def deliver(records):
if not records:
return True
token = azure_token("https://monitor.azure.com//.default")
url = f"{SENTINEL_DCE}/dataCollectionRules/{SENTINEL_DCR_ID}/streams/{SENTINEL_STREAM}?api-version=2023-01-01"
resp = requests.post(
url,
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
data=json.dumps(records), timeout=60,
)
if resp.status_code >= 300:
log.error("Sentinel delivery failed: %s %s", resp.status_code, resp.text[:300])
return False
return True
# ============================================================
# SECTION 6: RINOX
# ============================================================
RINOX_RUN_ID = str(int(time.time()))
log.info("Rinox run started: id=%s", RINOX_RUN_ID)
# ============================================================
# SECTION 7: MAIN ORCHESTRATOR (one-off)
# ============================================================
def main():
try:
ips = fetch_watchlist_ips()
except requests.RequestException as e:
log.error("Watchlist fetch failed: %s", e)
sys.exit(1)
log.info("Loaded %d IPs from watchlist %s", len(ips), SENTINEL_WATCHLIST)
records = []
not_found = 0
errors = 0
for ip in ips:
try:
code, data = shodan_lookup(ip)
except requests.RequestException as e:
log.error("Shodan request failed for %s: %s", ip, e)
errors += 1
continue
if code == 404:
not_found += 1
elif code == 429:
log.warning("Shodan rate-limited; sleeping 30s")
time.sleep(30)
elif code >= 300:
log.error("Shodan %s for %s: %s", code, ip, str(data)[:200])
errors += 1
else:
records.append(translate(ip, data))
time.sleep(RATE_LIMIT_SLEEP)
if not deliver(records):
log.error("Delivery failed.")
sys.exit(1)
log.info("Run complete: enriched=%d not_found=%d errors=%d", len(records), not_found, errors)
if __name__ == "__main__":
main()
Useful?
Used by 0 teams · Viewed 4 times · Last validated 5/17/2026