pushpythonscheduled✓ IRON 91hand-curated
IBM QRadar → Cortex XSOAR (Palo Alto)
Mirror new QRadar offenses into Cortex XSOAR every 5 minutes
Polls QRadar for new offenses and creates matching incidents in Cortex XSOAR. Tracks offense IDs to prevent duplicate incidents.
qradarxsoaroffenseincidentscheduledsyncmoderate
# ============================================================
# RINOX INTEGRATION: IBM QRadar -> Cortex XSOAR
# Generated by Rinox (rinox.io)
# Use Case: 5-minute offense mirroring into XSOAR incidents
# Language: python
# ============================================================
# ============================================================
# SECTION 1: LOGGING
# ============================================================
import logging
import os
import sys
import json
import time
from collections import deque
from datetime import datetime, timezone
from pathlib import Path
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", "INFO"),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
log = logging.getLogger("rinox.qradar_to_xsoar")
# ============================================================
# SECTION 2: AUTHENTICATION
# ============================================================
QRADAR_URL = os.environ.get("QRADAR_URL", "").rstrip("/")
QRADAR_TOKEN = os.environ.get("QRADAR_SEC_TOKEN", "")
XSOAR_URL = os.environ.get("XSOAR_URL", "").rstrip("/")
XSOAR_API_KEY = os.environ.get("XSOAR_API_KEY", "")
XSOAR_API_KEY_ID = os.environ.get("XSOAR_API_KEY_ID", "")
STATE_PATH = Path(os.environ.get("STATE_PATH", "./qradar_xsoar_state.json"))
FIFO_CAP = int(os.environ.get("FIFO_CAP", "5000"))
VERIFY_TLS = os.environ.get("QRADAR_VERIFY_TLS", "false").lower() == "true"
REQUIRED = {
"QRADAR_URL": QRADAR_URL, "QRADAR_SEC_TOKEN": QRADAR_TOKEN,
"XSOAR_URL": XSOAR_URL, "XSOAR_API_KEY": XSOAR_API_KEY,
"XSOAR_API_KEY_ID": XSOAR_API_KEY_ID,
}
missing = [k for k, v in REQUIRED.items() if not v]
if missing:
log.error("Missing env vars: %s", ", ".join(missing))
sys.exit(1)
def load_state():
if STATE_PATH.exists():
d = json.loads(STATE_PATH.read_text())
return {
"last_start_time": int(d.get("last_start_time", 0)),
"processed_ids": deque(d.get("processed_ids", []), maxlen=FIFO_CAP),
"processed_lookup": set(int(x) for x in d.get("processed_ids", [])),
}
return {"last_start_time": 0, "processed_ids": deque(maxlen=FIFO_CAP), "processed_lookup": set()}
def save_state(state):
payload = {
"last_start_time": int(state["last_start_time"]),
"processed_ids": [int(x) for x in state["processed_ids"]],
}
tmp = STATE_PATH.with_suffix(".tmp")
tmp.write_text(json.dumps(payload))
tmp.replace(STATE_PATH)
# ============================================================
# SECTION 3: SOURCE SYSTEM CALLS (QRadar)
# ============================================================
QRADAR_HEADERS = {"SEC": QRADAR_TOKEN, "Accept": "application/json", "Version": "12.0"}
def fetch_offenses(since_ms: int):
params = {
"filter": f"start_time>{since_ms} and status='OPEN'",
"sort": "+start_time",
}
resp = requests.get(f"{QRADAR_URL}/api/siem/offenses", headers=QRADAR_HEADERS,
params=params, verify=VERIFY_TLS, timeout=120)
resp.raise_for_status()
return resp.json()
# ============================================================
# SECTION 4: TRANSLATION
# ============================================================
SEVERITY_MAP = {1: "Low", 2: "Low", 3: "Low", 4: "Medium", 5: "Medium",
6: "Medium", 7: "High", 8: "High", 9: "Critical", 10: "Critical"}
def translate(offenses, processed_lookup):
payload = []
all_seen = []
max_start = 0
for off in offenses:
oid = off.get("id")
if oid is None:
continue
oid = int(oid)
all_seen.append(oid)
try:
start_ms = int(off.get("start_time", 0))
except (TypeError, ValueError):
start_ms = 0
if start_ms > max_start:
max_start = start_ms
if oid in processed_lookup:
continue
sev_raw = off.get("severity") or 0
try:
sev = SEVERITY_MAP.get(int(sev_raw), "Medium")
except (TypeError, ValueError):
sev = "Medium"
payload.append({
"name": f"QRadar Offense {oid}: {off.get('description', 'no description')[:120]}",
"type": "QRadar Offense",
"severity": sev,
"occurred": datetime.fromtimestamp(start_ms / 1000.0, tz=timezone.utc).isoformat() if start_ms else None,
"details": (off.get("description") or "")[:1000],
"rawJSON": json.dumps(off),
"labels": [
{"type": "qradar_offense_id", "value": str(oid)},
{"type": "magnitude", "value": str(off.get("magnitude", ""))},
],
})
return payload, max_start, all_seen
# ============================================================
# SECTION 5: TARGET SYSTEM CALLS (XSOAR)
# ============================================================
XSOAR_HEADERS = {
"Authorization": XSOAR_API_KEY,
"x-xdr-auth-id": XSOAR_API_KEY_ID,
"Content-Type": "application/json",
"Accept": "application/json",
}
def deliver(incidents):
if not incidents:
return True
# XSOAR /incident/batch accepts an array of incident objects.
resp = requests.post(f"{XSOAR_URL}/incident/batch",
headers=XSOAR_HEADERS,
data=json.dumps({"incidents": incidents}),
verify=VERIFY_TLS, timeout=120)
if resp.status_code >= 300:
log.error("XSOAR delivery failed: %s %s", resp.status_code, resp.text[:300])
return False
return True
# ============================================================
# SECTION 6: RINOX
# ============================================================
RINOX_RUN_ID = str(int(time.time()))
log.info("Rinox run started: id=%s", RINOX_RUN_ID)
# ============================================================
# SECTION 7: MAIN ORCHESTRATOR
# ============================================================
def main():
state = load_state()
log.info("Fetching offenses with start_time>%s", state["last_start_time"])
try:
offenses = fetch_offenses(state["last_start_time"])
except requests.RequestException as e:
log.error("QRadar fetch failed: %s", e)
sys.exit(1)
if not offenses:
log.info("No new offenses.")
return
payload, new_cursor, all_seen = translate(offenses, state["processed_lookup"])
if not payload:
if new_cursor > state["last_start_time"]:
state["last_start_time"] = new_cursor
for sid in all_seen:
if sid not in state["processed_lookup"]:
state["processed_ids"].append(sid)
state["processed_lookup"].add(sid)
save_state(state)
return
if not deliver(payload):
log.error("Delivery failed — state NOT updated, will retry next run")
sys.exit(1)
if new_cursor > state["last_start_time"]:
state["last_start_time"] = new_cursor
for sid in all_seen:
if sid not in state["processed_lookup"]:
state["processed_ids"].append(sid)
state["processed_lookup"].add(sid)
save_state(state)
log.info("Run complete: created=%d cursor=%s", len(payload), new_cursor)
if __name__ == "__main__":
main()
Useful?
Used by 0 teams · Viewed 4 times · Last validated 5/17/2026