rinoxRinox
enrichpythonone-off✓ IRON 90hand-curated

Shodan → Microsoft Sentinel

Enrich IPs from a Sentinel watchlist with Shodan host data

One-off run: reads a list of IPs from a Sentinel watchlist via the Logs Query API, queries Shodan for host details on each, and writes results back to a custom log table.

shodansentinelipenrichmentone-offgenericmoderate
# ============================================================
# RINOX INTEGRATION: Shodan -> Microsoft Sentinel (one-off enrichment)
# Generated by Rinox (rinox.io)
# Use Case: Enrich Sentinel watchlist IPs with Shodan host data
# Language: python
# ============================================================

# ============================================================
# SECTION 1: LOGGING
# ============================================================
import logging
import os
import sys
import json
import time
from datetime import datetime, timezone

import requests

logging.basicConfig(
    level=os.environ.get("LOG_LEVEL", "INFO"),
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
log = logging.getLogger("rinox.shodan_to_sentinel")

# ============================================================
# SECTION 2: AUTHENTICATION
# ============================================================
SHODAN_API_KEY = os.environ.get("SHODAN_API_KEY", "")
AZURE_TENANT_ID = os.environ.get("AZURE_TENANT_ID", "")
AZURE_CLIENT_ID = os.environ.get("AZURE_CLIENT_ID", "")
AZURE_CLIENT_SECRET = os.environ.get("AZURE_CLIENT_SECRET", "")
SENTINEL_WORKSPACE_ID = os.environ.get("SENTINEL_WORKSPACE_ID", "")
SENTINEL_WATCHLIST = os.environ.get("SENTINEL_WATCHLIST_NAME", "")
SENTINEL_IP_COLUMN = os.environ.get("SENTINEL_IP_COLUMN", "IPAddress")
SENTINEL_DCE = os.environ.get("SENTINEL_DCE_URL", "").rstrip("/")
SENTINEL_DCR_ID = os.environ.get("SENTINEL_DCR_IMMUTABLE_ID", "")
SENTINEL_STREAM = os.environ.get("SENTINEL_STREAM_NAME", "Custom-ShodanHostEnrichment_CL")
RATE_LIMIT_SLEEP = float(os.environ.get("RATE_LIMIT_SLEEP", "1.0"))

REQUIRED = {
    "SHODAN_API_KEY": SHODAN_API_KEY,
    "AZURE_TENANT_ID": AZURE_TENANT_ID, "AZURE_CLIENT_ID": AZURE_CLIENT_ID,
    "AZURE_CLIENT_SECRET": AZURE_CLIENT_SECRET,
    "SENTINEL_WORKSPACE_ID": SENTINEL_WORKSPACE_ID,
    "SENTINEL_WATCHLIST_NAME": SENTINEL_WATCHLIST,
    "SENTINEL_DCE_URL": SENTINEL_DCE, "SENTINEL_DCR_IMMUTABLE_ID": SENTINEL_DCR_ID,
}
missing = [k for k, v in REQUIRED.items() if not v]
if missing:
    log.error("Missing env vars: %s", ", ".join(missing))
    sys.exit(1)


def azure_token(scope):
    resp = requests.post(
        f"https://login.microsoftonline.com/{AZURE_TENANT_ID}/oauth2/v2.0/token",
        data={"grant_type": "client_credentials", "client_id": AZURE_CLIENT_ID,
              "client_secret": AZURE_CLIENT_SECRET, "scope": scope},
        timeout=30,
    )
    resp.raise_for_status()
    return resp.json()["access_token"]


# ============================================================
# SECTION 3: SOURCE SYSTEM CALLS (Sentinel watchlist + Shodan)
# ============================================================
def fetch_watchlist_ips():
    token = azure_token("https://api.loganalytics.io/.default")
    kql = f"_GetWatchlist('{SENTINEL_WATCHLIST}') | project {SENTINEL_IP_COLUMN}"
    resp = requests.post(
        f"https://api.loganalytics.io/v1/workspaces/{SENTINEL_WORKSPACE_ID}/query",
        headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
        json={"query": kql}, timeout=60,
    )
    resp.raise_for_status()
    body = resp.json()
    rows = body.get("tables", [{}])[0].get("rows", [])
    return [r[0] for r in rows if r and r[0]]


def shodan_lookup(ip):
    resp = requests.get(
        f"https://api.shodan.io/shodan/host/{ip}",
        params={"key": SHODAN_API_KEY},
        timeout=30,
    )
    return resp.status_code, resp.json() if resp.headers.get("content-type", "").startswith("application/json") else {}


# ============================================================
# SECTION 4: TRANSLATION
# ============================================================
def translate(ip, shodan_data):
    return {
        "TimeGenerated": datetime.now(timezone.utc).isoformat(),
        "IPAddress": ip,
        "Country": shodan_data.get("country_name"),
        "ASN": shodan_data.get("asn"),
        "Org": shodan_data.get("org"),
        "OS": shodan_data.get("os"),
        "Ports": shodan_data.get("ports") or [],
        "Hostnames": shodan_data.get("hostnames") or [],
        "Vulns": list((shodan_data.get("vulns") or {}).keys()) if isinstance(shodan_data.get("vulns"), dict) else (shodan_data.get("vulns") or []),
        "LastUpdate": shodan_data.get("last_update"),
        "Tags": shodan_data.get("tags") or [],
    }


# ============================================================
# SECTION 5: TARGET SYSTEM CALLS (Sentinel Logs Ingestion)
# ============================================================
def deliver(records):
    if not records:
        return True
    token = azure_token("https://monitor.azure.com//.default")
    url = f"{SENTINEL_DCE}/dataCollectionRules/{SENTINEL_DCR_ID}/streams/{SENTINEL_STREAM}?api-version=2023-01-01"
    resp = requests.post(
        url,
        headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
        data=json.dumps(records), timeout=60,
    )
    if resp.status_code >= 300:
        log.error("Sentinel delivery failed: %s %s", resp.status_code, resp.text[:300])
        return False
    return True


# ============================================================
# SECTION 6: RINOX
# ============================================================
RINOX_RUN_ID = str(int(time.time()))
log.info("Rinox run started: id=%s", RINOX_RUN_ID)


# ============================================================
# SECTION 7: MAIN ORCHESTRATOR (one-off)
# ============================================================
def main():
    try:
        ips = fetch_watchlist_ips()
    except requests.RequestException as e:
        log.error("Watchlist fetch failed: %s", e)
        sys.exit(1)

    log.info("Loaded %d IPs from watchlist %s", len(ips), SENTINEL_WATCHLIST)
    records = []
    not_found = 0
    errors = 0

    for ip in ips:
        try:
            code, data = shodan_lookup(ip)
        except requests.RequestException as e:
            log.error("Shodan request failed for %s: %s", ip, e)
            errors += 1
            continue
        if code == 404:
            not_found += 1
        elif code == 429:
            log.warning("Shodan rate-limited; sleeping 30s")
            time.sleep(30)
        elif code >= 300:
            log.error("Shodan %s for %s: %s", code, ip, str(data)[:200])
            errors += 1
        else:
            records.append(translate(ip, data))
        time.sleep(RATE_LIMIT_SLEEP)

    if not deliver(records):
        log.error("Delivery failed.")
        sys.exit(1)
    log.info("Run complete: enriched=%d not_found=%d errors=%d", len(records), not_found, errors)


if __name__ == "__main__":
    main()

Useful?

Used by 0 teams · Viewed 4 times · Last validated 5/17/2026