파이썬으로 PING , PORTSCAN 웹 화면에서 테스트 하기
포트는 9999
FLASK를 설치 PIP INSTALL FLASK
아래 파일.
CSV, JSON 파일 다운로드 받기 .
1-포트 설정하여 스캔
스캔도 멀티테스킹 처리 .
============ 소스 ===================
import subprocess
import platform
import re
import time
import socket
from datetime import datetime
from flask import Flask, request, render_template_string, jsonify
app = Flask(__name__)
# —————- HTML 템플릿 —————-
PAGE_TEMPLATE = “””
<!doctype html>
<html lang=”ko”>
<head>
<meta charset=”utf-8″>
<title>Ping + Port Scan 테스트</title>
<style>
body {
font-family: Arial, sans-serif;
max-width: 900px;
margin: 40px auto;
}
h1 {
text-align: center;
}
form {
margin-bottom: 20px;
text-align: center;
}
input[type=”text”] {
width: 280px;
padding: 8px;
font-size: 14px;
}
input[type=”number”] {
width: 80px;
padding: 6px;
font-size: 14px;
}
input[type=”submit”], button {
padding: 8px 16px;
font-size: 14px;
cursor: pointer;
margin-left: 6px;
}
.result-box {
border: 1px solid #ddd;
padding: 15px;
border-radius: 8px;
background: #f9f9f9;
margin-top: 20px;
}
.status-ok {
color: green;
font-weight: bold;
}
.status-fail {
color: red;
font-weight: bold;
}
pre {
background: #222;
color: #eee;
padding: 10px;
overflow-x: auto;
border-radius: 6px;
font-size: 12px;
}
table {
border-collapse: collapse;
width: 100%;
margin-top: 10px;
}
table th, table td {
border: 1px solid #ccc;
padding: 6px;
text-align: center;
font-size: 13px;
}
table th {
background: #eee;
}
.small-text {
font-size: 12px;
color: #666;
}
#continuous-area {
margin-top: 20px;
}
#continuous-log {
max-height: 260px;
overflow-y: auto;
border: 1px solid #ddd;
border-radius: 6px;
padding: 6px;
background: #111;
color: #eee;
font-size: 12px;
}
#continuous-log div {
margin-bottom: 2px;
}
#port-scan-area {
margin-top: 20px;
}
#port-scan-log {
font-size: 12px;
margin-top: 4px;
white-space: pre-wrap;
}
#port-progress {
font-size: 12px;
margin-top: 4px;
}
#open-port-summary {
font-size: 13px;
margin-top: 8px;
}
</style>
</head>
<body>
<h1>Ping + Port Scan 테스트 (포트 9999)</h1>
<!– 폼: 타깃 + 실행횟수 + 계속모드 + 포트 문자열 –>
<form id=”ping-form” method=”POST” action=”/run_test”>
<div style=”margin-bottom: 8px;”>
<input type=”text” name=”target” id=”target”
placeholder=”도메인 또는 IP 입력 (예: 8.8.8.8)”
value=”{{ target or ” }}”>
</div>
<div style=”margin-bottom: 8px;”>
<label>
실행 횟수 (Ping):
<input type=”number” name=”count” id=”count” min=”1″ value=”{{ count or 4 }}”>
</label>
<label style=”margin-left: 10px;”>
<input type=”checkbox” name=”continuous” id=”continuous”
{% if continuous %}checked{% endif %}>
계속 (1초마다 Ping)
</label>
</div>
<div style=”margin-bottom: 10px;”>
<label>
포트 스캔 (예: 80,443 또는 1-100):
<input type=”text” name=”ports” id=”ports” value=”{{ ports or ” }}”>
</label>
</div>
<input type=”submit” value=”Ping 실행”>
<button type=”button” id=”stop-btn” disabled>계속 모드 정지</button>
</form>
<div class=”small-text”>
• Ping 실행: 지정한 횟수만큼 서버에서 ping 실행 후 결과 요약<br>
• 계속 모드: 브라우저에서 1초마다 /api/ping_once 호출하여 실시간 Ping 로그<br>
• 포트 스캔: 아래 별도 버튼으로 진행 (멀티스레드처럼 여러 포트를 동시에 스캔, 진행률/요약/다운로드 제공)
</div>
{% if summary %}
<!– Ping 실행 결과 –>
<div class=”result-box”>
<h3>Ping 실행 결과 요약 ({{ summary.total }}회)</h3>
<p>대상: <b>{{ target }}</b></p>
<p>
성공: <b>{{ summary.success }}</b> 회,
실패: <b>{{ summary.fail }}</b> 회,
패킷 손실: <b>{{ summary.loss_rate }}%</b>
</p>
<p>
핑 시간(ms) (성공 패킷 기준):<br>
최소: <b>{{ summary.min_ms }}</b>,
최대: <b>{{ summary.max_ms }}</b>,
평균: <b>{{ summary.avg_ms }}</b>
</p>
<table>
<thead>
<tr>
<th>순번</th>
<th>시간</th>
<th>상태</th>
<th>핑 시간(ms)</th>
</tr>
</thead>
<tbody>
{% for row in detail %}
<tr>
<td>{{ loop.index }}</td>
<td>{{ row.timestamp }}</td>
<td>
{% if row.success %}
<span class=”status-ok”>성공</span>
{% else %}
<span class=”status-fail”>실패</span>
{% endif %}
</td>
<td>
{% if row.latency_ms is none %}
PING 실패
{% else %}
{{ “%.1f”|format(row.latency_ms) }}
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% if raw_output %}
<details>
<summary>마지막 ping 원본 출력 보기</summary>
<pre>{{ raw_output }}</pre>
</details>
{% endif %}
</div>
{% endif %}
<!– 포트 스캔 영역 –>
<div id=”port-scan-area” class=”result-box”>
<h3>포트 스캔 (멀티스레드 진행상황 + 요약 + 다운로드)</h3>
<p class=”small-text”>
• 타깃 / 포트 목록 입력 후 [포트 스캔 시작] 클릭<br>
• 여러 포트를 동시에 스캔하면서 진행률과 결과 테이블을 실시간 갱신합니다.<br>
• 스캔 완료 후 열린 포트 요약 + CSV / JSON 다운로드 가능
</p>
<div style=”margin-bottom: 8px;”>
<label>
동시 스캔 수:
<input type=”number” id=”concurrency” min=”1″ max=”200″ value=”{{ concurrency or 20 }}”>
</label>
<span class=”small-text”> (한 번에 동시에 처리할 최대 포트 수, 기본 20 / 최대 200)</span>
</div>
<button type=”button” id=”port-scan-start-btn”>포트 스캔 시작</button>
<button type=”button” id=”port-scan-stop-btn” disabled>포트 스캔 정지</button>
<button type=”button” id=”download-csv-btn” disabled>CSV 다운로드</button>
<button type=”button” id=”download-json-btn” disabled>JSON 다운로드</button>
<div id=”port-progress”></div>
<div id=”port-scan-log”></div>
<div id=”open-port-summary”></div>
<table>
<thead>
<tr>
<th>포트</th>
<th>상태</th>
<th>비고</th>
</tr>
</thead>
<tbody id=”port-table-body”>
</tbody>
</table>
</div>
<!– 계속 모드용 영역 –>
<div id=”continuous-area”>
<h3>계속 모드 실시간 Ping 로그</h3>
<div class=”small-text”>
• 위에서 “계속” 체크 후 Ping 실행 버튼을 누르면 1초마다 자동으로 ping 결과가 아래에 추가됩니다.<br>
• “계속 모드 정지” 버튼을 누르면 중단됩니다.
</div>
<div id=”continuous-log”></div>
</div>
<script>
let pingIntervalId = null;
const form = document.getElementById(‘ping-form’);
const continuousCheckbox = document.getElementById(‘continuous’);
const stopBtn = document.getElementById(‘stop-btn’);
const targetInput = document.getElementById(‘target’);
const countInput = document.getElementById(‘count’);
const logDiv = document.getElementById(‘continuous-log’);
const portScanStartBtn = document.getElementById(‘port-scan-start-btn’);
const portScanStopBtn = document.getElementById(‘port-scan-stop-btn’);
const portsInput = document.getElementById(‘ports’);
const portProgressDiv = document.getElementById(‘port-progress’);
const portScanLogDiv = document.getElementById(‘port-scan-log’);
const portTableBody = document.getElementById(‘port-table-body’);
const concurrencyInput = document.getElementById(‘concurrency’);
const openPortSummaryDiv = document.getElementById(‘open-port-summary’);
const downloadCsvBtn = document.getElementById(‘download-csv-btn’);
const downloadJsonBtn = document.getElementById(‘download-json-btn’);
let portScanAbort = false;
let portResults = []; // {port, open, note}
function addLogLine(text, isOk) {
const line = document.createElement(‘div’);
if (isOk === true) {
line.style.color = ‘#7CFC00’;
} else if (isOk === false) {
line.style.color = ‘#FF6347’;
} else {
line.style.color = ‘#FFFFFF’;
}
line.textContent = text;
logDiv.appendChild(line);
logDiv.scrollTop = logDiv.scrollHeight;
}
function startContinuousPing(target) {
if (!target) {
alert(‘타겟을 입력해주세요.’);
continuousCheckbox.checked = false;
return;
}
if (pingIntervalId !== null) {
clearInterval(pingIntervalId);
}
addLogLine(‘— 계속 모드 시작: ‘ + target + ‘ —‘, null);
pingIntervalId = setInterval(function() {
fetch(‘/api/ping_once?target=’ + encodeURIComponent(target))
.then(function(response) { return response.json(); })
.then(function(data) {
const ts = data.timestamp || ”;
const ok = data.success;
const latency = data.latency_ms;
let msg = ‘[‘ + ts + ‘] ‘ + data.target + ‘ -> ‘;
if (ok) {
if (latency === null) {
msg += ‘✅ 성공 | PING 시간: -‘;
} else {
msg += ‘✅ 성공 | PING 시간: ‘ + latency.toFixed(1) + ‘ ms’;
}
} else {
msg += ‘❌ 실패 | PING 시간: 알 수 없음’;
}
addLogLine(msg, ok);
})
.catch(function(err) {
addLogLine(‘에러: ‘ + err, false);
});
}, 1000);
stopBtn.disabled = false;
}
function stopContinuousPing() {
if (pingIntervalId !== null) {
clearInterval(pingIntervalId);
pingIntervalId = null;
addLogLine(‘— 계속 모드 정지 —‘, null);
}
stopBtn.disabled = true;
}
// Ping 폼 제출 시 동작
form.addEventListener(‘submit’, function(ev) {
const isContinuous = continuousCheckbox.checked;
const target = targetInput.value.trim();
if (isContinuous) {
ev.preventDefault();
startContinuousPing(target);
} else {
stopContinuousPing();
if (!countInput.value) {
countInput.value = 4;
}
}
});
stopBtn.addEventListener(‘click’, function() {
stopContinuousPing();
});
// —- 포트 스캔 관련 JS —-
function parsePortsLocal(portsStr) {
portsStr = portsStr.trim();
if (!portsStr) return [];
const result = new Set();
const parts = portsStr.split(‘,’);
for (let part of parts) {
part = part.trim();
if (!part) continue;
// 혹시 전각 대시(– — −) 들어올 경우 ASCII ‘-‘ 로 치환
part = part.replace(/[–—−]/g, ‘-‘);
if (part.includes(‘-‘)) {
const rangeParts = part.split(‘-‘);
if (rangeParts.length !== 2) continue;
const startStr = rangeParts[0].trim();
const endStr = rangeParts[1].trim();
const s = parseInt(startStr, 10);
const e = parseInt(endStr, 10);
if (isNaN(s) || isNaN(e)) continue;
const start = Math.min(s, e);
const end = Math.max(s, e);
for (let p = start; p <= end; p++) {
if (p >= 1 && p <= 65535) result.add(p);
}
} else {
const p = parseInt(part, 10);
if (!isNaN(p) && p >= 1 && p <= 65535) result.add(p);
}
}
return Array.from(result).sort((a, b) => a – b);
}
async function scanPortOnce(target, port) {
const res = await fetch(
‘/api/scan_port?target=’
+ encodeURIComponent(target)
+ ‘&port=’ + encodeURIComponent(port)
);
return res.json();
}
function updateOpenPortSummary() {
if (portResults.length === 0) {
openPortSummaryDiv.textContent = “”;
return;
}
const openPorts = portResults
.filter(r => r.open)
.map(r => r.port)
.sort((a, b) => a – b);
if (openPorts.length === 0) {
openPortSummaryDiv.textContent = “열린 포트 없음.”;
} else {
openPortSummaryDiv.textContent =
“열린 포트 (” + openPorts.length + “개): ” + openPorts.join(“, “);
}
}
function downloadJSON() {
if (portResults.length === 0) {
alert(“다운로드할 스캔 결과가 없습니다.”);
return;
}
const target = targetInput.value.trim() || “target”;
const dataStr = JSON.stringify(portResults, null, 2);
const blob = new Blob([dataStr], { type: “application/json” });
const url = URL.createObjectURL(blob);
const a = document.createElement(“a”);
const ts = new Date().toISOString().replace(/[:.]/g, “-“);
a.href = url;
a.download = “port_scan_” + target + “_” + ts + “.json”;
document.body.appendChild(a);
a.click();
document.body.removeChild(a);
URL.revokeObjectURL(url);
}
function downloadCSV() {
if (portResults.length === 0) {
alert(“다운로드할 스캔 결과가 없습니다.”);
return;
}
const target = targetInput.value.trim() || “target”;
function escapeCsv(value) {
if (value == null) return “”;
const s = String(value);
if (s.includes(‘”‘) || s.includes(‘,’) || s.includes(‘\\n’)) {
return ‘”‘ + s.replace(/”/g, ‘””‘) + ‘”‘;
}
return s;
}
let csv = “port,status,note\\n”;
for (const r of portResults) {
const status = r.open ? “OPEN” : “CLOSED”;
csv += [
escapeCsv(r.port),
escapeCsv(status),
escapeCsv(r.note)
].join(“,”) + “\\n”;
}
// 👉 한글이 엑셀에서 안 깨지도록 UTF-8 BOM 추가
const bom = “\\uFEFF”; // JS에서 문자열 리터럴로 해석되면서 실제 BOM 문자로 변환
const blob = new Blob([bom + csv], { type: “text/csv;charset=utf-8;” });
const url = URL.createObjectURL(blob);
const a = document.createElement(“a”);
const ts = new Date().toISOString().replace(/[:.]/g, “-“);
a.href = url;
a.download = “port_scan_” + target + “_” + ts + “.csv”;
document.body.appendChild(a);
a.click();
document.body.removeChild(a);
URL.revokeObjectURL(url);
}
async function startPortScan() {
const target = targetInput.value.trim();
const portsStr = portsInput.value.trim();
if (!target) {
alert(“타깃(도메인 또는 IP)을 입력하세요.”);
return;
}
if (!portsStr) {
alert(“포트 목록을 입력하세요.”);
return;
}
const ports = parsePortsLocal(portsStr);
if (ports.length === 0) {
alert(“유효한 포트가 없습니다. (예: 80,443 또는 1-100)”);
return;
}
let concurrency = parseInt(concurrencyInput.value, 10);
if (isNaN(concurrency) || concurrency <= 0) {
concurrency = 10;
}
if (concurrency > 200) {
concurrency = 200;
}
concurrencyInput.value = concurrency;
// 초기화
portScanAbort = false;
portResults = [];
portTableBody.innerHTML = “”;
portProgressDiv.textContent = “”;
portScanLogDiv.textContent = “”;
openPortSummaryDiv.textContent = “”;
portScanStartBtn.disabled = true;
portScanStopBtn.disabled = false;
downloadCsvBtn.disabled = true;
downloadJsonBtn.disabled = true;
const total = ports.length;
let done = 0;
let index = 0;
portScanLogDiv.textContent =
`스캔 시작: ${target}, 포트 ${total}개 (동시 스캔 수: ${concurrency})`;
portScanLogDiv.textContent += “\\n포트 목록: ” + ports.join(“, “);
async function worker() {
while (!portScanAbort) {
let currentIndex;
if (index >= total) break;
currentIndex = index++;
const port = ports[currentIndex];
portProgressDiv.textContent =
`진행: ${done} / ${total} (현재 포트: ${port})`;
try {
const data = await scanPortOnce(target, port);
const tr = document.createElement(“tr”);
const tdPort = document.createElement(“td”);
const tdStatus = document.createElement(“td”);
const tdNote = document.createElement(“td”);
tdPort.textContent = port;
if (data.open) {
tdStatus.innerHTML = ‘<span class=”status-ok”>OPEN</span>’;
} else {
tdStatus.innerHTML = ‘<span class=”status-fail”>CLOSED</span>’;
}
tdNote.textContent = data.note || “”;
tr.appendChild(tdPort);
tr.appendChild(tdStatus);
tr.appendChild(tdNote);
portTableBody.appendChild(tr);
portResults.push({
port: port,
open: !!data.open,
note: data.note || “”
});
} catch (e) {
const tr = document.createElement(“tr”);
tr.innerHTML = ‘<td>’ + port + ‘</td>’ +
‘<td><span class=”status-fail”>ERROR</span></td>’ +
‘<td>’ + e + ‘</td>’;
portTableBody.appendChild(tr);
portResults.push({
port: port,
open: false,
note: “에러: ” + e
});
}
done += 1;
portProgressDiv.textContent = `진행: ${done} / ${total}`;
}
}
const workers = [];
const numWorkers = Math.min(concurrency, total);
for (let i = 0; i < numWorkers; i++) {
workers.push(worker());
}
await Promise.all(workers);
updateOpenPortSummary();
if (!portScanAbort) {
portScanLogDiv.textContent += “\\n스캔 완료.”;
} else {
portScanLogDiv.textContent += “\\n사용자에 의해 스캔 중단됨.”;
}
portScanStartBtn.disabled = false;
portScanStopBtn.disabled = true;
if (portResults.length > 0) {
downloadCsvBtn.disabled = false;
downloadJsonBtn.disabled = false;
}
}
function stopPortScan() {
portScanAbort = true;
portScanStopBtn.disabled = true;
portScanLogDiv.textContent += “\\n정지 요청됨…”;
}
portScanStartBtn.addEventListener(“click”, startPortScan);
portScanStopBtn.addEventListener(“click”, stopPortScan);
downloadCsvBtn.addEventListener(“click”, downloadCSV);
downloadJsonBtn.addEventListener(“click”, downloadJSON);
</script>
</body>
</html>
“””
# —————- Ping 관련 함수 —————-
def parse_latency(output: str):
“””
ping 출력에서 time=XXms / time=XX ms 패턴 찾아서 ms 값 추출
“””
pattern = r”time[=<]\s*([0-9]+(?:\.[0-9]+)?)\s*ms”
match = re.search(pattern, output, re.IGNORECASE)
if match:
try:
return float(match.group(1))
except ValueError:
return None
return None
def ping_once(target: str, timeout_ms: int = 1000):
“””
ping 1회 실행
“””
system = platform.system().lower()
if system.startswith(“win”):
cmd = [“ping”, “-n”, “1”, “-w”, str(timeout_ms), target]
else:
timeout_s = max(int(timeout_ms / 1000), 1)
cmd = [“ping”, “-c”, “1”, “-W”, str(timeout_s), target]
try:
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
except Exception as e:
return False, None, f”ping 실행 에러: {e}”
success = (result.returncode == 0)
latency = parse_latency(result.stdout)
raw_output = result.stdout if result.stdout.strip() else result.stderr
return success, latency, raw_output
def ping_multi(target: str, count: int, timeout_ms: int = 1000, interval_sec: float = 0.2):
“””
같은 대상에 대해 여러 번 ping 실행
“””
detail_rows = []
last_output = “”
for _ in range(count):
success, latency_ms, raw_output = ping_once(target, timeout_ms=timeout_ms)
last_output = raw_output
detail_rows.append({
“success”: success,
“latency_ms”: latency_ms,
“timestamp”: datetime.now().strftime(“%H:%M:%S”),
})
if _ < count – 1:
time.sleep(interval_sec)
total = len(detail_rows)
success_cnt = sum(1 for r in detail_rows if r[“success”])
fail_cnt = total – success_cnt
loss_rate = round(fail_cnt / total * 100, 1) if total > 0 else 0.0
latencies = [r[“latency_ms”] for r in detail_rows if r[“latency_ms”] is not None]
if latencies:
min_ms = round(min(latencies), 1)
max_ms = round(max(latencies), 1)
avg_ms = round(sum(latencies) / len(latencies), 1)
else:
min_ms = max_ms = avg_ms = “-“
summary = {
“total”: total,
“success”: success_cnt,
“fail”: fail_cnt,
“loss_rate”: loss_rate,
“min_ms”: min_ms,
“max_ms”: max_ms,
“avg_ms”: avg_ms,
}
return summary, detail_rows, last_output
# —————- 포트 스캔 관련 함수 —————-
def scan_port(target: str, port: int, timeout: float = 1.0):
“””
TCP 포트 1개 스캔 (열림/닫힘)
“””
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
result = sock.connect_ex((target, port))
if result == 0:
return True, “연결 성공”
else:
return False, f”연결 실패 (코드: {result})”
except Exception as e:
return False, f”예외 발생: {e}”
finally:
sock.close()
# —————- Flask 라우팅 —————-
@app.route(“/”, methods=[“GET”])
def index():
return render_template_string(
PAGE_TEMPLATE,
target=””,
count=4,
continuous=False,
ports=””,
concurrency=20,
summary=None,
detail=None,
raw_output=None,
)
@app.route(“/run_test”, methods=[“POST”])
def run_test():
target = request.form.get(“target”, “”).strip()
count_str = request.form.get(“count”, “4”).strip()
continuous = bool(request.form.get(“continuous”))
ports_input = request.form.get(“ports”, “”).strip()
summary = None
detail = None
raw_output = None
if not target:
return render_template_string(
PAGE_TEMPLATE,
target=target,
count=count_str,
continuous=continuous,
ports=ports_input,
concurrency=20,
summary=None,
detail=None,
raw_output=None,
)
try:
count = int(count_str)
if count <= 0:
count = 1
except ValueError:
count = 4
summary, detail, raw_output = ping_multi(target, count=count, timeout_ms=1000)
return render_template_string(
PAGE_TEMPLATE,
target=target,
count=count,
continuous=False,
ports=ports_input,
concurrency=20,
summary=summary,
detail=detail,
raw_output=raw_output,
)
@app.route(“/api/ping_once”, methods=[“GET”])
def api_ping_once():
target = request.args.get(“target”, “”).strip()
if not target:
return jsonify({
“success”: False,
“latency_ms”: None,
“target”: “”,
“timestamp”: datetime.now().strftime(“%H:%M:%S”),
“error”: “target parameter is required”,
})
success, latency_ms, raw_output = ping_once(target, timeout_ms=1000)
return jsonify({
“success”: success,
“latency_ms”: latency_ms,
“target”: target,
“timestamp”: datetime.now().strftime(“%H:%M:%S”),
“raw_output”: raw_output,
})
@app.route(“/api/scan_port”, methods=[“GET”])
def api_scan_port():
target = request.args.get(“target”, “”).strip()
port_str = request.args.get(“port”, “”).strip()
if not target or not port_str:
return jsonify({
“open”: False,
“note”: “target 또는 port 파라미터가 없습니다.”,
})
try:
port = int(port_str)
except ValueError:
return jsonify({
“open”: False,
“note”: “포트 번호가 잘못되었습니다.”,
})
is_open, note = scan_port(target, port, timeout=1.0)
return jsonify({
“open”: is_open,
“note”: note,
})
if __name__ == “__main__”:
# threaded=True: 동시에 여러 /api/scan_port 요청 처리
app.run(host=”0.0.0.0″, port=9999, debug=True, threaded=True)
========== end =====================