rinoxRinox
pushpythonscheduled✓ IRON 91hand-curated

IBM QRadar → Cortex XSOAR (Palo Alto)

Mirror new QRadar offenses into Cortex XSOAR every 5 minutes

Polls QRadar for new offenses and creates matching incidents in Cortex XSOAR. Tracks offense IDs to prevent duplicate incidents.

qradarxsoaroffenseincidentscheduledsyncmoderate
# ============================================================
# RINOX INTEGRATION: IBM QRadar -> Cortex XSOAR
# Generated by Rinox (rinox.io)
# Use Case: 5-minute offense mirroring into XSOAR incidents
# Language: python
# ============================================================

# ============================================================
# SECTION 1: LOGGING
# ============================================================
import logging
import os
import sys
import json
import time
from collections import deque
from datetime import datetime, timezone
from pathlib import Path

import requests
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logging.basicConfig(
    level=os.environ.get("LOG_LEVEL", "INFO"),
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
log = logging.getLogger("rinox.qradar_to_xsoar")

# ============================================================
# SECTION 2: AUTHENTICATION
# ============================================================
QRADAR_URL = os.environ.get("QRADAR_URL", "").rstrip("/")
QRADAR_TOKEN = os.environ.get("QRADAR_SEC_TOKEN", "")
XSOAR_URL = os.environ.get("XSOAR_URL", "").rstrip("/")
XSOAR_API_KEY = os.environ.get("XSOAR_API_KEY", "")
XSOAR_API_KEY_ID = os.environ.get("XSOAR_API_KEY_ID", "")
STATE_PATH = Path(os.environ.get("STATE_PATH", "./qradar_xsoar_state.json"))
FIFO_CAP = int(os.environ.get("FIFO_CAP", "5000"))
VERIFY_TLS = os.environ.get("QRADAR_VERIFY_TLS", "false").lower() == "true"

REQUIRED = {
    "QRADAR_URL": QRADAR_URL, "QRADAR_SEC_TOKEN": QRADAR_TOKEN,
    "XSOAR_URL": XSOAR_URL, "XSOAR_API_KEY": XSOAR_API_KEY,
    "XSOAR_API_KEY_ID": XSOAR_API_KEY_ID,
}
missing = [k for k, v in REQUIRED.items() if not v]
if missing:
    log.error("Missing env vars: %s", ", ".join(missing))
    sys.exit(1)


def load_state():
    if STATE_PATH.exists():
        d = json.loads(STATE_PATH.read_text())
        return {
            "last_start_time": int(d.get("last_start_time", 0)),
            "processed_ids": deque(d.get("processed_ids", []), maxlen=FIFO_CAP),
            "processed_lookup": set(int(x) for x in d.get("processed_ids", [])),
        }
    return {"last_start_time": 0, "processed_ids": deque(maxlen=FIFO_CAP), "processed_lookup": set()}


def save_state(state):
    payload = {
        "last_start_time": int(state["last_start_time"]),
        "processed_ids": [int(x) for x in state["processed_ids"]],
    }
    tmp = STATE_PATH.with_suffix(".tmp")
    tmp.write_text(json.dumps(payload))
    tmp.replace(STATE_PATH)


# ============================================================
# SECTION 3: SOURCE SYSTEM CALLS (QRadar)
# ============================================================
QRADAR_HEADERS = {"SEC": QRADAR_TOKEN, "Accept": "application/json", "Version": "12.0"}


def fetch_offenses(since_ms: int):
    params = {
        "filter": f"start_time>{since_ms} and status='OPEN'",
        "sort": "+start_time",
    }
    resp = requests.get(f"{QRADAR_URL}/api/siem/offenses", headers=QRADAR_HEADERS,
                        params=params, verify=VERIFY_TLS, timeout=120)
    resp.raise_for_status()
    return resp.json()


# ============================================================
# SECTION 4: TRANSLATION
# ============================================================
SEVERITY_MAP = {1: "Low", 2: "Low", 3: "Low", 4: "Medium", 5: "Medium",
                6: "Medium", 7: "High", 8: "High", 9: "Critical", 10: "Critical"}


def translate(offenses, processed_lookup):
    payload = []
    all_seen = []
    max_start = 0

    for off in offenses:
        oid = off.get("id")
        if oid is None:
            continue
        oid = int(oid)
        all_seen.append(oid)
        try:
            start_ms = int(off.get("start_time", 0))
        except (TypeError, ValueError):
            start_ms = 0
        if start_ms > max_start:
            max_start = start_ms
        if oid in processed_lookup:
            continue

        sev_raw = off.get("severity") or 0
        try:
            sev = SEVERITY_MAP.get(int(sev_raw), "Medium")
        except (TypeError, ValueError):
            sev = "Medium"

        payload.append({
            "name": f"QRadar Offense {oid}: {off.get('description', 'no description')[:120]}",
            "type": "QRadar Offense",
            "severity": sev,
            "occurred": datetime.fromtimestamp(start_ms / 1000.0, tz=timezone.utc).isoformat() if start_ms else None,
            "details": (off.get("description") or "")[:1000],
            "rawJSON": json.dumps(off),
            "labels": [
                {"type": "qradar_offense_id", "value": str(oid)},
                {"type": "magnitude", "value": str(off.get("magnitude", ""))},
            ],
        })

    return payload, max_start, all_seen


# ============================================================
# SECTION 5: TARGET SYSTEM CALLS (XSOAR)
# ============================================================
XSOAR_HEADERS = {
    "Authorization": XSOAR_API_KEY,
    "x-xdr-auth-id": XSOAR_API_KEY_ID,
    "Content-Type": "application/json",
    "Accept": "application/json",
}


def deliver(incidents):
    if not incidents:
        return True
    # XSOAR /incident/batch accepts an array of incident objects.
    resp = requests.post(f"{XSOAR_URL}/incident/batch",
                         headers=XSOAR_HEADERS,
                         data=json.dumps({"incidents": incidents}),
                         verify=VERIFY_TLS, timeout=120)
    if resp.status_code >= 300:
        log.error("XSOAR delivery failed: %s %s", resp.status_code, resp.text[:300])
        return False
    return True


# ============================================================
# SECTION 6: RINOX
# ============================================================
RINOX_RUN_ID = str(int(time.time()))
log.info("Rinox run started: id=%s", RINOX_RUN_ID)


# ============================================================
# SECTION 7: MAIN ORCHESTRATOR
# ============================================================
def main():
    state = load_state()
    log.info("Fetching offenses with start_time>%s", state["last_start_time"])

    try:
        offenses = fetch_offenses(state["last_start_time"])
    except requests.RequestException as e:
        log.error("QRadar fetch failed: %s", e)
        sys.exit(1)

    if not offenses:
        log.info("No new offenses.")
        return

    payload, new_cursor, all_seen = translate(offenses, state["processed_lookup"])

    if not payload:
        if new_cursor > state["last_start_time"]:
            state["last_start_time"] = new_cursor
        for sid in all_seen:
            if sid not in state["processed_lookup"]:
                state["processed_ids"].append(sid)
                state["processed_lookup"].add(sid)
        save_state(state)
        return

    if not deliver(payload):
        log.error("Delivery failed — state NOT updated, will retry next run")
        sys.exit(1)

    if new_cursor > state["last_start_time"]:
        state["last_start_time"] = new_cursor
    for sid in all_seen:
        if sid not in state["processed_lookup"]:
            state["processed_ids"].append(sid)
            state["processed_lookup"].add(sid)
    save_state(state)
    log.info("Run complete: created=%d cursor=%s", len(payload), new_cursor)


if __name__ == "__main__":
    main()

Useful?

Used by 0 teams · Viewed 4 times · Last validated 5/17/2026