ClickHouse Korea Collateral
ClickHouse Korea Collateral
/
🌐
보안 테스트 트래픽 분석 (MV, Aggregation, Vector Search)
보안 테스트 트래픽 분석 (MV, Aggregation, Vector Search)
🌐

보안 테스트 트래픽 분석 (MV, Aggregation, Vector Search)

ClickHouse 분류
Case Study
Type
Lab
작성자

Ken

보안 테스트를 수행하다 보면 수많은 HTTP 트래픽 데이터가 쌓입니다. 침투 테스트, 레드팀 훈련, 외부 보안 검증 등 다양한 보안 활동에서 발생하는 이 데이터를 효과적으로 분석하면 보안 이벤트 검증 자동화, 이상 트래픽 탐지, 그리고 민감정보 보호까지 한 번에 해결할 수 있습니다.

이 글에서는 ClickHouse를 활용하여 실시간 보안 트래픽 분석 플랫폼을 구축하는 방법을 소개합니다.

1. 배경

보안 테스트 환경의 도전 과제

침투 테스트, 레드팀 훈련, 외부 보안 검증 프로그램 등을 운영할 때 몇 가지 공통적인 도전 과제가 발생합니다:

1) 검증(Validation) 병목

  • 매일 수십~수백 건의 보안 이벤트가 발생
  • 각 이벤트의 재현 가능성을 수동으로 확인해야 함
  • "재현 불가" 이벤트 분류에 많은 시간 소요

2) 이상(Anomaly) 트래픽 식별

  • 테스트 범위를 벗어난 비정상 행위 탐지 필요
  • Bruteforce, Scanner, DDoS 등 악성 행위 식별
  • 정상 테스트 활동과 이상 행위 구분의 어려움

3) 민감정보 노출 위험

  • 테스트 과정에서 실제 사용자 데이터가 응답에 포함될 수 있음
  • JWT, API Key, 개인정보 등이 로그에 그대로 저장됨
  • 규정 준수(GDPR, 개인정보보호법) 이슈

왜 ClickHouse인가?

특징
보안 트래픽 분석에서의 활용
초고속 집계
수백만 패킷에서 실시간 위협 패턴 탐지
컬럼 지향 저장
HTTP 헤더/바디 등 특정 필드만 빠르게 스캔
Materialized View
실시간 비식별화, 자동 집계 파이프라인
정규식 함수
민감정보 패턴 탐지 및 마스킹
TTL 관리
보존 기간에 따른 자동 데이터 정리

2. 아키텍처 설계

전체 데이터 흐름

핵심 구성 요소

1) 원본 저장소 (http_packets)

  • 모든 HTTP 요청/응답을 원본 그대로 저장
  • 90일 TTL로 자동 정리
  • 포렌식 및 상세 분석용

2) 실시간 파이프라인 (Materialized Views)

  • mv_anonymize_packets: 데이터 입력 즉시 비식별화
  • mv_anomaly_detection: 1분 단위 이상 패턴 집계
  • mv_auto_validation: 5분 단위 이벤트 자동 분류

3) 분석/대응 레이어

  • Grafana 대시보드로 실시간 모니터링
  • 임계값 초과 시 자동 차단 목록 추가
  • 비식별화 데이터는 AI 학습용으로 활용

3. 스키마 설계

테이블 구조 개요

3.1 원본 패킷 테이블

설계 포인트:

  • LowCardinality: HTTP 메서드처럼 카디널리티가 낮은 필드에 적용하여 압축률 향상
  • Map 타입: 가변적인 HTTP 헤더를 유연하게 저장
  • TTL: 90일 후 자동 삭제로 스토리지 관리

4. 유즈케이스 1: 보안 이벤트 시퀀스 분석 + 패치 검증

처리 흐름

4.1 공격 시퀀스 추출

-- SQL Injection 이벤트의 전체 요청 시퀀스
SELECT
    timestamp,
    request_method,
    request_uri,
    response_status,
    substring(response_body, 1, 80) as response_preview
FROM security_analytics.http_packets
WHERE event_id = 'SEC-2024-1234'
ORDER BY timestamp;

결과:

timestamp
request_uri
response_status
response_preview
2026-01-29 08:24:22
/api/users/1
200
{"id": 1, "name": "John"}
2026-01-29 08:24:52
/api/users/1' OR '1'='1
500
{"error": "SQL syntax error..."}
2026-01-29 08:25:22
/api/users/1 UNION SELECT...
500
{"error": "SQL syntax error..."}
2026-01-29 08:25:52
/api/users/1;SELECT password...
200
{"password_hash": "5f4d..."} 🔴
2026-01-30 08:24:22
/api/users/1' OR '1'='1
400
{"error": "Invalid input"} ✅
2026-01-30 08:24:52
/api/users/1;SELECT password...
400
{"error": "Invalid input"} ✅

4.2 자동 검증 판정

4.3 패치 전후 회귀 검증

결과:

phase
requests
vuln_triggered
status
1️⃣ BEFORE_PATCH
4
3
❌ 취약점 존재
2️⃣ AFTER_PATCH
2
0
✅ 취약점 수정됨

5. 유즈케이스 2: 이상 트래픽 실시간 탐지·차단

이상 유형별 탐지 로직

5.1 실시간 이상 탐지 쿼리

결과:

source_ip
tester_id
request_count
unique_endpoints
avg_response_ms
anomaly_type
192.168.100.100
tester-001
200
1
74
🔐 BRUTEFORCE
192.168.200.200
tester-002
150
16
80
✅ NORMAL
192.168.150.150
tester-003
100
100
69
🔍 SCANNER
192.168.250.250
tester-004
80
5
9728
💥 EDoS

5.2 Bruteforce 패턴 상세 분석

SELECT
    toStartOfMinute(timestamp) as minute,
    count() as attempts,
    countIf(response_status = 401) as failed,
    countIf(response_status = 200) as success,
    if(countIf(response_status = 200) > 0,
       '⚠️ 인증 우회 성공!', '🔒 차단됨') as status
FROM security_analytics.http_packets
WHERE session_id = 'bruteforce-session-001'
GROUP BY minute
ORDER BY minute;

결과:

minute
attempts
failed
success
status
08:19:00
18
18
0
🔒 차단됨
08:20:00
60
60
0
🔒 차단됨
08:21:00
60
60
0
🔒 차단됨
08:22:00
60
59
1
⚠️ 인증 우회 성공!
08:23:00
2
2
0
🔒 차단됨

6. 유즈케이스 3: 응답 데이터 자동 비식별화

비식별화 파이프라인

6.1 비식별화 Materialized View

6.2 원본 vs 비식별화 비교

결과:

type
session_id
tester_id
data
🔓 원본
pii-session-001
researcher-bob
{"token": "eyJhbGciOiJIUzI1...", "email": "john@example.com"}
🔒 비식별화
pii-session-001
71003D3985D307B1
{"token": "[JWT_REDACTED]", "email": "[EMAIL_REDACTED]"}
🔓 원본
pii-session-003
researcher-charlie
{"card_number": "4111-1111-1111-1111", "email": "test@test.com"}
🔒 비식별화
pii-session-003
6EFE229F10E8189C
{"card_number": "[CARD_REDACTED]", "email": "[EMAIL_REDACTED]"}

6.3 PII 노출 통계

결과:

pii_type
exposure_count
affected_sessions
severity
EMAIL
18
4
🟠 HIGH
PHONE_KR
2
2
🟠 HIGH
API_KEY
2
2
🔴 CRITICAL
JWT
1
1
🔴 CRITICAL
CREDIT_CARD
1
1
🔴 CRITICAL

7. 유즈케이스 4: Vector Search를 활용한 지능형 분석

왜 Vector Search인가?

기존 정규식 기반 탐지는 알려진 패턴만 잡아낼 수 있습니다. 하지만 공격자들은 끊임없이 패턴을 변형합니다:

-- 기존 정규식으로 탐지 가능
' OR '1'='1

-- 변형된 패턴 (정규식 우회)
' OR 'x'='x
' OR 1=1--
'/**/OR/**/1=1

Vector Search는 텍스트를 의미 기반 벡터로 변환하여, 정확히 일치하지 않아도 유사한 공격 패턴을 탐지할 수 있습니다.

Vector Search 아키텍처

7.1 공격 시그니처 테이블

알려진 취약점 패턴(CWE)을 벡터로 저장하여 유사 공격을 탐지합니다.

샘플 데이터 구조:

pattern_name
category
cwe_id
sample_payload
Basic SQL Injection
SQLi
CWE-89
' OR '1'='1
Union-based SQLi
SQLi
CWE-89
' UNION SELECT * FROM users--
Reflected XSS
XSS
CWE-79
<script>alert(1)</script>
Path Traversal
LFI
CWE-22
../../../etc/passwd

7.2 요청 임베딩 테이블

HTTP 요청을 벡터로 변환하여 저장합니다.

7.3 유사 공격 패턴 탐지

새로운 요청이 알려진 공격 패턴과 얼마나 유사한지 실시간으로 확인합니다.

결과 예시:

pattern_name
category
cwe_id
severity
similarity_score
match_level
Union-based SQLi
SQLi
CWE-89
CRITICAL
0.1523
🔴 매우 유사
Basic SQL Injection
SQLi
CWE-89
HIGH
0.2841
🟠 유사
Blind SQLi
SQLi
CWE-89
HIGH
0.3127
🟠 유사
Error-based SQLi
SQLi
CWE-89
MEDIUM
0.4892
🟡 관련 있음
XSS Reflected
XSS
CWE-79
MEDIUM
0.7234
🟢 관련 없음

7.4 중복 이벤트 탐지

새 보안 이벤트가 제출될 때 기존 이벤트와의 유사도를 자동으로 확인합니다.

중복 탐지 쿼리:

결과 예시:

event_id
title
status
priority
similarity
duplicate_check
SEC-2024-0892
Login SQL Injection
RESOLVED
CRITICAL
0.1834
⚠️ 중복 가능성 높음
SEC-2024-0756
Auth Bypass via SQLi
RESOLVED
HIGH
0.3421
✅ 신규 이벤트
SEC-2024-0623
User Enumeration
VERIFIED
MEDIUM
0.6782
✅ 신규 이벤트

7.5 시맨틱 검색

자연어로 과거 보안 이벤트를 검색할 수 있습니다.

-- "JWT 토큰 조작으로 인한 인증 우회" 관련 이벤트 검색
SELECT
    event_id,
    title,
    vulnerability_type,
    status,
    priority,
    round(cosineDistance(content_embedding, [/* query embedding */]), 4) as relevance
FROM security_analytics.event_knowledge_base
ORDER BY relevance ASC
LIMIT 10;

검색 예시:

검색어
결과
"JWT manipulation authentication bypass"
JWT 관련 인증 우회 이벤트
"sensitive data exposure in API response"
API 응답의 민감정보 노출 이벤트
"rate limiting bypass"
Rate Limit 우회 관련 이벤트

7.6 임베딩 파이프라인 구현

실제 운영 환경에서는 Python 기반 파이프라인으로 임베딩을 생성합니다.

7.7 Vector Search 활용 효과

지표
기존
Vector Search 적용
신규 공격 패턴 탐지율
60%
85%
중복 이벤트 자동 탐지
수동
자동 (95% 정확도)
검색 정확도
키워드 의존
의미 기반
검증 시간
30분/건
5분/건

7.8 주의사항 및 Best Practices

1) 임베딩 비용 관리

2) 인덱스 튜닝

-- HNSW 파라미터 조정 (정확도 vs 속도 트레이드오프)
INDEX idx_embedding embedding
    TYPE vector_similarity('hnsw', 'cosineDistance', 'ef_construction=128, m=16')

3) 임계값 설정

유사도
해석
액션
< 0.2
거의 동일
자동 차단/중복 처리
0.2 ~ 0.4
매우 유사
우선 검토
0.4 ~ 0.6
관련 있음
참고용
> 0.6
관련 없음
무시

8. 결론 및 확장 방안

핵심 요약

유즈케이스
ClickHouse 활용
기대 효과
보안 이벤트 시퀀스 분석
시퀀스 추출 + 패턴 매칭
검증 시간 80% 단축
이상 트래픽 탐지
Refreshable MV + 점수 기반
실시간 이상 행위 차단
응답 데이터 비식별화
정규식 + 실시간 MV
규정 준수 + AI 활용
Vector Search
임베딩 + 유사도 검색
변형 공격 탐지 + 중복 이벤트 자동화

확장 방안

마무리

ClickHouse의 강력한 집계 성능과 Materialized View를 활용하면, 대규모 보안 테스트 트래픽에서도 실시간으로 이상 행위를 탐지하고 민감정보를 보호할 수 있습니다.

특히 Refreshable Materialized View는 별도의 스케줄러 없이 주기적인 배치 작업을 ClickHouse 내부에서 처리할 수 있어, 아키텍처를 단순하게 유지하면서도 복잡한 분석 파이프라인을 구축할 수 있습니다.

여기에 Vector Search를 더하면 정규식으로 잡지 못하는 변형 공격 패턴을 탐지하고, 중복 이벤트를 자동으로 걸러내며, 자연어로 과거 보안 이벤트를 검색하는 지능형 분석이 가능해집니다. ClickHouse 25.8부터 GA된 Vector Search 기능은 별도의 벡터 DB 없이 단일 플랫폼에서 정형 데이터 분석과 시맨틱 검색을 모두 처리할 수 있게 해줍니다.

clickhouse-hols/usecase/security-traffic-analysis at main · litkhai/clickhouse-hols

ClickHouse Hands-on Labs . Contribute to litkhai/clickhouse-hols development by creating an account on GitHub.

github.com

clickhouse-hols/usecase/security-traffic-analysis at main · litkhai/clickhouse-hols
CREATE TABLE security_analytics.http_packets
(
    -- 기본 식별 정보
    packet_id UUID DEFAULT generateUUIDv4(),
    session_id String,
    event_id String COMMENT '보안 이벤트 ID',
    tester_id String COMMENT '테스터 ID',

    -- 시간 정보
    timestamp DateTime64(3) DEFAULT now64(3),

    -- 요청 정보
    request_method LowCardinality(String),
    request_uri String,
    request_headers Map(String, String),
    request_body String,

    -- 응답 정보
    response_status UInt16,
    response_headers Map(String, String),
    response_body String,
    response_time_ms UInt32,

    -- 네트워크 정보
    source_ip String,
    dest_ip String,

    -- 메타데이터
    content_length UInt64 DEFAULT 0,
    user_agent String DEFAULT '',

    -- 파티셔닝용
    event_date Date DEFAULT toDate(timestamp)
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (session_id, timestamp, packet_id)
TTL event_date + INTERVAL 90 DAY;
SELECT
    event_id,
    count() as total_requests,
    countIf(response_status >= 500) as server_errors,
    countIf(response_body LIKE '%password%') as sensitive_exposed,

    -- 자동 판정
    multiIf(
        countIf(response_body LIKE '%password%') > 0,
            '🔴 CRITICAL - 민감정보 노출',
        countIf(response_status >= 500) > 0,
            '🟠 HIGH - 서버 에러 유발',
        '🟢 LOW - 추가 검토 필요'
    ) as verdict
FROM security_analytics.http_packets
WHERE event_id = 'SEC-2024-1234'
GROUP BY event_id;
SELECT
    if(timestamp < now() - INTERVAL 36 HOUR,
       '1️⃣ BEFORE_PATCH', '2️⃣ AFTER_PATCH') as phase,
    count() as requests,
    countIf(response_body LIKE '%password_hash%') as vuln_triggered,
    if(countIf(response_body LIKE '%password_hash%') > 0,
       '❌ 취약점 존재', '✅ 취약점 수정됨') as status
FROM security_analytics.http_packets
WHERE event_id = 'SEC-2024-1234'
GROUP BY phase
ORDER BY phase;
SELECT
    source_ip,
    tester_id,
    count() as request_count,
    uniq(request_uri) as unique_endpoints,
    round(avg(response_time_ms), 0) as avg_response_ms,

    multiIf(
        count() > 100 AND uniq(request_uri) < 5, '🔐 BRUTEFORCE',
        uniq(request_uri) > 30, '🔍 SCANNER',
        avg(response_time_ms) > 5000, '💥 EDoS',
        '✅ NORMAL'
    ) as anomaly_type
FROM security_analytics.http_packets
WHERE timestamp >= now() - INTERVAL 1 HOUR
GROUP BY source_ip, tester_id
HAVING count() > 10
ORDER BY request_count DESC;
CREATE MATERIALIZED VIEW security_analytics.mv_anonymize_packets
TO security_analytics.http_packets_anonymized
AS
SELECT
    packet_id,
    session_id,
    event_id,
    hex(SHA256(tester_id)) as tester_id_hash,
    timestamp,
    request_method,

    -- Response Body 비식별화
    replaceRegexpAll(
        replaceRegexpAll(
            replaceRegexpAll(
                replaceRegexpAll(
                    replaceRegexpAll(response_body,
                        -- JWT 토큰
                        'eyJ[A-Za-z0-9_-]{10,}\\.[A-Za-z0-9_-]{10,}\\.[A-Za-z0-9_-]{10,}',
                        '[JWT_REDACTED]'),
                    -- API Key
                    '"[Aa]pi[_-]?[Kk]ey"\\s*:\\s*"[^"]+"',
                    '"api_key":"[REDACTED]"'),
                -- 이메일
                '[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}',
                '[EMAIL_REDACTED]'),
            -- 한국 전화번호
            '01[0-9]-?[0-9]{3,4}-?[0-9]{4}',
            '[PHONE_REDACTED]'),
        -- 신용카드 번호
        '\\b[0-9]{4}[- ]?[0-9]{4}[- ]?[0-9]{4}[- ]?[0-9]{4}\\b',
        '[CARD_REDACTED]'
    ) as response_body_sanitized,

    -- 탐지된 PII 유형
    arrayFilter(x -> x != '', [
        if(match(response_body, 'eyJ[A-Za-z0-9_-]{10,}'), 'JWT', ''),
        if(match(response_body, '"[Aa]pi[_-]?[Kk]ey"'), 'API_KEY', ''),
        if(match(response_body, '[a-zA-Z0-9._%+-]+@'), 'EMAIL', ''),
        if(match(response_body, '01[0-9]-?[0-9]{3,4}-?[0-9]{4}'), 'PHONE_KR', ''),
        if(match(response_body, '\\b[0-9]{4}[- ]?[0-9]{4}[- ]?[0-9]{4}'), 'CREDIT_CARD', '')
    ]) as detected_pii_types,

    event_date
FROM security_analytics.http_packets;
SELECT
    '🔓 원본' as type,
    session_id,
    tester_id,
    substring(response_body, 1, 100) as data
FROM security_analytics.http_packets
WHERE session_id LIKE 'pii-session%'

UNION ALL

SELECT
    '🔒 비식별화' as type,
    session_id,
    substring(tester_id_hash, 1, 16) as tester_id,
    substring(response_body_sanitized, 1, 100) as data
FROM security_analytics.http_packets_anonymized
WHERE session_id LIKE 'pii-session%'
ORDER BY session_id, type;
SELECT
    arrayJoin(detected_pii_types) as pii_type,
    count() as exposure_count,
    uniq(session_id) as affected_sessions,
    multiIf(
        pii_type IN ('JWT', 'API_KEY', 'CREDIT_CARD'), '🔴 CRITICAL',
        pii_type IN ('EMAIL', 'PHONE_KR'), '🟠 HIGH',
        '🟡 MEDIUM'
    ) as severity
FROM security_analytics.http_packets_anonymized
WHERE length(detected_pii_types) > 0
GROUP BY pii_type
ORDER BY exposure_count DESC;
CREATE TABLE security_analytics.attack_signatures
(
    signature_id UUID DEFAULT generateUUIDv4(),

    -- 패턴 메타데이터
    pattern_name String,
    pattern_description String,
    category LowCardinality(String),  -- SQLi, XSS, SSRF, IDOR 등

    -- 취약점 분류
    cwe_id String,           -- CWE-89, CWE-79 등
    cvss_score Float32,      -- 0.0 ~ 10.0
    severity LowCardinality(String),

    -- 샘플 페이로드
    sample_payload String,

    -- Vector Embedding (1536 dimensions)
    payload_embedding Array(Float32),

    -- Vector Index (HNSW 알고리즘)
    INDEX idx_payload_embedding payload_embedding
        TYPE vector_similarity('hnsw', 'cosineDistance')
        GRANULARITY 100000000,

    created_at DateTime DEFAULT now()
)
ENGINE = MergeTree
ORDER BY (category, signature_id);
CREATE TABLE security_analytics.request_embeddings
(
    packet_id UUID,

    -- 임베딩 대상 (정규화된 요청)
    normalized_request String,  -- method + uri + body 조합

    -- Vector Embedding
    request_embedding Array(Float32),

    INDEX idx_request_embedding request_embedding
        TYPE vector_similarity('hnsw', 'cosineDistance')
        GRANULARITY 100000000,

    embedding_model String DEFAULT 'text-embedding-3-small',
    created_at DateTime DEFAULT now()
)
ENGINE = MergeTree
ORDER BY packet_id;
-- 특정 요청과 유사한 공격 패턴 찾기
SELECT
    s.pattern_name,
    s.category,
    s.cwe_id,
    s.severity,
    s.sample_payload,
    round(cosineDistance(r.request_embedding, s.payload_embedding), 4) as similarity_score,

    multiIf(
        cosineDistance(r.request_embedding, s.payload_embedding) < 0.2, '🔴 매우 유사',
        cosineDistance(r.request_embedding, s.payload_embedding) < 0.4, '🟠 유사',
        cosineDistance(r.request_embedding, s.payload_embedding) < 0.6, '🟡 관련 있음',
        '🟢 관련 없음'
    ) as match_level
FROM security_analytics.request_embeddings r
CROSS JOIN security_analytics.attack_signatures s
WHERE r.packet_id = 'target-packet-uuid'
ORDER BY similarity_score ASC
LIMIT 5;
CREATE TABLE security_analytics.event_knowledge_base
(
    event_id String,

    -- 이벤트 내용
    title String,
    description String,
    vulnerability_type LowCardinality(String),

    -- 상태
    status LowCardinality(String),  -- SUBMITTED, VERIFIED, RESOLVED, DUPLICATE
    priority LowCardinality(String),

    -- Vector Embedding
    content_embedding Array(Float32),

    INDEX idx_content_embedding content_embedding
        TYPE vector_similarity('hnsw', 'cosineDistance')
        GRANULARITY 100000000,

    created_at DateTime DEFAULT now()
)
ENGINE = ReplacingMergeTree(created_at)
ORDER BY event_id;
-- 새 이벤트와 유사한 기존 이벤트 찾기
WITH new_event_embedding AS (
    -- 실제로는 API에서 임베딩 생성 후 전달
    SELECT [0.1, 0.2, ...] as embedding
)
SELECT
    event_id,
    title,
    vulnerability_type,
    status,
    priority,
    round(cosineDistance(content_embedding,
        (SELECT embedding FROM new_event_embedding)), 4) as similarity,

    if(cosineDistance(content_embedding,
        (SELECT embedding FROM new_event_embedding)) < 0.25,
        '⚠️ 중복 가능성 높음',
        '✅ 신규 이벤트') as duplicate_check
FROM security_analytics.event_knowledge_base
WHERE status IN ('RESOLVED', 'VERIFIED')
ORDER BY similarity ASC
LIMIT 5;
import clickhouse_connect
from openai import OpenAI

# 클라이언트 초기화
ch_client = clickhouse_connect.get_client(host='...', password='...')
openai_client = OpenAI()

def get_embedding(text: str) -> list[float]:
    """텍스트를 벡터로 변환"""
    response = openai_client.embeddings.create(
        model="text-embedding-3-small",
        input=text[:8000]  # 토큰 제한
    )
    return response.data[0].embedding

def embed_http_request(packet_id: str, method: str, uri: str, body: str):
    """HTTP 요청 임베딩 생성 및 저장"""
    normalized = f"{method} {uri}\n{body[:2000]}"
    embedding = get_embedding(normalized)

    ch_client.insert(
        'security_analytics.request_embeddings',
        [[packet_id, normalized, embedding]],
        column_names=['packet_id', 'normalized_request', 'request_embedding']
    )

def find_similar_attacks(packet_id: str, threshold: float = 0.4) -> list:
    """유사 공격 패턴 검색"""
    query = f"""
    SELECT
        s.pattern_name,
        s.cwe_id,
        s.severity,
        cosineDistance(r.request_embedding, s.payload_embedding) as score
    FROM security_analytics.request_embeddings r
    CROSS JOIN security_analytics.attack_signatures s
    WHERE r.packet_id = '{packet_id}'
      AND cosineDistance(r.request_embedding, s.payload_embedding) < {threshold}
    ORDER BY score ASC
    LIMIT 5
    """
    return ch_client.query(query).result_rows

def check_duplicate_event(title: str, description: str) -> list:
    """중복 이벤트 검사"""
    content = f"{title}\n{description}"
    embedding = get_embedding(content)

    query = f"""
    SELECT
        event_id,
        title,
        status,
        cosineDistance(content_embedding, {embedding}) as similarity
    FROM security_analytics.event_knowledge_base
    WHERE cosineDistance(content_embedding, {embedding}) < 0.3
    ORDER BY similarity ASC
    LIMIT 3
    """
    return ch_client.query(query).result_rows
# 배치 처리로 API 호출 최소화
def batch_embed(texts: list[str], batch_size: int = 100):
    embeddings = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i+batch_size]
        response = openai_client.embeddings.create(
            model="text-embedding-3-small",
            input=batch
        )
        embeddings.extend([e.embedding for e in response.data])
    return embeddings