pushpythonscheduled✓ IRON 92hand-curated
CrowdStrike Falcon → Microsoft Sentinel
Forward CrowdStrike detections to Microsoft Sentinel every 15 minutes
Polls the CrowdStrike Falcon detections API every 15 minutes and forwards new detections to a Sentinel custom log table via the Logs Ingestion API.
crowdstrikesentineldetectionscheduledpushmoderate
# ============================================================
# RINOX INTEGRATION: CrowdStrike Falcon -> Microsoft Sentinel
# Generated by Rinox (rinox.io)
# Use Case: 15-min detection forwarding via Logs Ingestion API
# Language: python
# ============================================================
# ============================================================
# SECTION 1: LOGGING
# ============================================================
import logging
import os
import sys
import json
import time
from collections import deque
from datetime import datetime, timezone
from pathlib import Path
import requests
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", "INFO"),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
log = logging.getLogger("rinox.cs_to_sentinel")
# ============================================================
# SECTION 2: AUTHENTICATION
# ============================================================
CS_BASE = os.environ.get("CROWDSTRIKE_BASE_URL", "https://api.crowdstrike.com").rstrip("/")
CS_CLIENT_ID = os.environ.get("CROWDSTRIKE_CLIENT_ID", "")
CS_CLIENT_SECRET = os.environ.get("CROWDSTRIKE_CLIENT_SECRET", "")
SENTINEL_DCE = os.environ.get("SENTINEL_DCE_URL", "").rstrip("/")
SENTINEL_DCR_ID = os.environ.get("SENTINEL_DCR_IMMUTABLE_ID", "")
SENTINEL_STREAM = os.environ.get("SENTINEL_STREAM_NAME", "Custom-CrowdStrikeDetections_CL")
AZURE_TENANT_ID = os.environ.get("AZURE_TENANT_ID", "")
AZURE_CLIENT_ID = os.environ.get("AZURE_CLIENT_ID", "")
AZURE_CLIENT_SECRET = os.environ.get("AZURE_CLIENT_SECRET", "")
STATE_PATH = Path(os.environ.get("STATE_PATH", "./cs_sentinel_state.json"))
FIFO_CAP = int(os.environ.get("FIFO_CAP", "10000"))
REQUIRED = {
"CROWDSTRIKE_CLIENT_ID": CS_CLIENT_ID, "CROWDSTRIKE_CLIENT_SECRET": CS_CLIENT_SECRET,
"SENTINEL_DCE_URL": SENTINEL_DCE, "SENTINEL_DCR_IMMUTABLE_ID": SENTINEL_DCR_ID,
"AZURE_TENANT_ID": AZURE_TENANT_ID, "AZURE_CLIENT_ID": AZURE_CLIENT_ID,
"AZURE_CLIENT_SECRET": AZURE_CLIENT_SECRET,
}
missing = [k for k, v in REQUIRED.items() if not v]
if missing:
log.error("Missing env vars: %s", ", ".join(missing))
sys.exit(1)
def cs_token():
resp = requests.post(
f"{CS_BASE}/oauth2/token",
data={"client_id": CS_CLIENT_ID, "client_secret": CS_CLIENT_SECRET},
timeout=30,
)
resp.raise_for_status()
return resp.json()["access_token"]
def azure_token():
resp = requests.post(
f"https://login.microsoftonline.com/{AZURE_TENANT_ID}/oauth2/v2.0/token",
data={
"grant_type": "client_credentials",
"client_id": AZURE_CLIENT_ID,
"client_secret": AZURE_CLIENT_SECRET,
"scope": "https://monitor.azure.com//.default",
},
timeout=30,
)
resp.raise_for_status()
return resp.json()["access_token"]
def load_state():
if STATE_PATH.exists():
d = json.loads(STATE_PATH.read_text())
return {
"last_created_ts": int(d.get("last_created_ts", 0)),
"processed_ids": deque(d.get("processed_ids", []), maxlen=FIFO_CAP),
"processed_lookup": set(d.get("processed_ids", [])),
}
return {"last_created_ts": 0, "processed_ids": deque(maxlen=FIFO_CAP), "processed_lookup": set()}
def save_state(state):
payload = {
"last_created_ts": int(state["last_created_ts"]),
"processed_ids": list(state["processed_ids"]),
}
tmp = STATE_PATH.with_suffix(".tmp")
tmp.write_text(json.dumps(payload))
tmp.replace(STATE_PATH)
# ============================================================
# SECTION 3: SOURCE SYSTEM CALLS (CrowdStrike)
# ============================================================
def fetch_detections(token, since_iso):
"""Bulk fetch detection IDs, then bulk hydrate their details."""
headers = {"Authorization": f"Bearer {token}"}
fql = f"created_timestamp:>'{since_iso}'+status:'new'"
ids_resp = requests.get(
f"{CS_BASE}/detects/queries/detects/v1",
headers=headers,
params={"filter": fql, "limit": 200, "sort": "created_timestamp.asc"},
timeout=60,
)
ids_resp.raise_for_status()
ids = ids_resp.json().get("resources", [])
if not ids:
return []
detail_resp = requests.post(
f"{CS_BASE}/detects/entities/summaries/GET/v1",
headers={**headers, "Content-Type": "application/json"},
json={"ids": ids},
timeout=60,
)
detail_resp.raise_for_status()
return detail_resp.json().get("resources", [])
# ============================================================
# SECTION 4: TRANSLATION
# ============================================================
def translate(detections, processed_lookup):
payload = []
all_seen = []
max_ts = 0
for det in detections:
det_id = det.get("detection_id") or det.get("composite_id")
if not det_id:
continue
all_seen.append(det_id)
created = det.get("created_timestamp") or det.get("date_updated") or ""
try:
ts = int(datetime.fromisoformat(created.replace("Z", "+00:00")).timestamp())
except (ValueError, TypeError):
ts = 0
if ts > max_ts:
max_ts = ts
if det_id in processed_lookup:
continue
payload.append({
"TimeGenerated": created or datetime.now(timezone.utc).isoformat(),
"DetectionId": det_id,
"Severity": det.get("max_severity_displayname", "Unknown"),
"SeverityValue": det.get("max_severity", 0),
"Tactic": det.get("tactic"),
"Technique": det.get("technique"),
"DeviceHostname": (det.get("device") or {}).get("hostname"),
"DeviceId": (det.get("device") or {}).get("device_id"),
"Status": det.get("status"),
"Description": det.get("description", ""),
"RawEvent": det,
})
return payload, max_ts, all_seen
# ============================================================
# SECTION 5: TARGET SYSTEM CALLS (Sentinel Logs Ingestion API)
# ============================================================
def deliver(records, az_token):
if not records:
return True
url = f"{SENTINEL_DCE}/dataCollectionRules/{SENTINEL_DCR_ID}/streams/{SENTINEL_STREAM}?api-version=2023-01-01"
headers = {"Authorization": f"Bearer {az_token}", "Content-Type": "application/json"}
# The Logs Ingestion API expects a JSON array of records.
resp = requests.post(url, headers=headers, data=json.dumps(records), timeout=60)
if resp.status_code >= 300:
log.error("Sentinel delivery failed: %s %s", resp.status_code, resp.text[:300])
return False
return True
# ============================================================
# SECTION 6: RINOX
# ============================================================
RINOX_RUN_ID = str(int(time.time()))
log.info("Rinox run started: id=%s", RINOX_RUN_ID)
# ============================================================
# SECTION 7: MAIN ORCHESTRATOR
# ============================================================
def main():
state = load_state()
if state["last_created_ts"]:
since_iso = datetime.fromtimestamp(state["last_created_ts"], tz=timezone.utc).isoformat()
else:
since_iso = (datetime.now(timezone.utc).replace(microsecond=0)).isoformat()
cs = cs_token()
az = azure_token()
try:
detections = fetch_detections(cs, since_iso)
except requests.RequestException as e:
log.error("CrowdStrike fetch failed: %s", e)
sys.exit(1)
if not detections:
log.info("No new detections.")
return
payload, new_cursor, all_seen = translate(detections, state["processed_lookup"])
if not payload:
if new_cursor > state["last_created_ts"]:
state["last_created_ts"] = new_cursor
for sid in all_seen:
if sid not in state["processed_lookup"]:
state["processed_ids"].append(sid)
state["processed_lookup"].add(sid)
save_state(state)
return
if not deliver(payload, az):
log.error("Delivery failed — state NOT updated, will retry next run")
sys.exit(1)
if new_cursor > state["last_created_ts"]:
state["last_created_ts"] = new_cursor
for sid in all_seen:
if sid not in state["processed_lookup"]:
state["processed_ids"].append(sid)
state["processed_lookup"].add(sid)
save_state(state)
log.info("Run complete: delivered=%d, cursor=%s", len(payload), new_cursor)
if __name__ == "__main__":
main()
Useful?
Used by 0 teams · Viewed 4 times · Last validated 5/17/2026