#!/bin/env python3 import hashlib import json import os import shutil import stat import telnetlib from datetime import datetime from typing import List from zoneinfo import ZoneInfo import paramiko from paramiko_expect import SSHClientInteraction class JsonConfigLoader: def __init__(self, fpath: str): with open(fpath, 'r', encoding='utf-8') as fp: self.data = json.load(fp) fp.close() @property def tftp_host(self) -> str: return self.data['tftp']['server'] @property def tftp_path(self) -> str: return self.data['tftp']['path'] @property def tftp_rootdir(self) -> str: return self.data['tftp']['rootdir'] @property def backup_destdir(self) -> str: return self.data['backup']['destdir'] @property def hosts(self) -> str: return self.data['hosts'] class ConnectionBase: TIMEOUT = 30 PROMPT_USERNAME = ['User(name)?:\\s*'] PROMPT_PASSWORD = ['Pass(word)?:\\s*'] PROMPT_COMMAND = ['.*\\]\\s*', '.*>\\s*', '.*#\\s*'] def __init__(self, hostname: str): self.hostname = hostname def login(self, username: str, password: str): self.username = username self.password = password def send(self, line: str): pass def expect(self, patterns: List): pass def wait_command_prompt(self): self.expect(self.PROMPT_COMMAND) def close(self): self.username = None self.password = None class ConnectionTelnet(ConnectionBase): def login(self, username: str, password: str): self.conn = telnetlib.Telnet(self.hostname, timeout=self.TIMEOUT) self.expect(self.PROMPT_USERNAME) self.send(username) self.expect(self.PROMPT_PASSWORD) self.send(password) self.wait_command_prompt() super().login(username, password) def send(self, line: str): # print('send:' + line) self.conn.write(line.encode('utf-8') + b'\n') def expect(self, patterns: List): # print('expect:' + ','.join(patterns)) self.conn.expect(list(map(lambda x: x.encode('utf-8'), patterns)), self.TIMEOUT) def close(self): self.conn.read_all() self.conn.close() self.conn = None super().close() class ConnectionSsh(ConnectionBase): def login(self, username: str, password: str): self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.client.connect(hostname=self.hostname, username=username, password=password, timeout=self.TIMEOUT, look_for_keys=False) self.conn = SSHClientInteraction(self.client, timeout=self.TIMEOUT, display=False) self.send('') self.wait_command_prompt() super().login(username, password) def send(self, line: str): # print('send:' + line) self.conn.send(line) def expect(self, patterns: List): # print('expect:' + ','.join(patterns)) self.conn.expect(patterns, self.TIMEOUT) def close(self): self.client.close() self.conn = None self.client = None super().close() class SwitchConfigFetcher: def __init__(self, host: dict[str, str]): self.hostname = host['hostname'] self.protocol = host['protocol'] self.system = host['system'] self.username = host['username'] self.password = host['password'] self.enable = host['enable'] if 'enable' in host else None def fetch(self, tftp_server: str, tftp_fpath: str): conn = ConnectionTelnet(self.hostname) if self.protocol == 'telnet' else ConnectionSsh(self.hostname) conn.login(self.username, self.password) if self.system in ['s5100', 'a5120']: config = 'config.cfg' if self.system == 's5100' else 'startup.cfg' conn.send('tftp ' + tftp_server + ' put ' + config + ' ' + tftp_fpath) conn.wait_command_prompt() conn.send('quit') else: if self.enable is not None: conn.send('enable') if self.enable != "": conn.expect(ConnectionBase.PROMPT_PASSWORD) conn.send(self.enable) conn.wait_command_prompt() conn.send('copy running-config tftp://' + tftp_server + '/' + tftp_fpath) if self.system in ['n4000', 'n3000']: conn.expect(['.*\\(y/n\\)\\s*']) conn.send('y') if self.enable is not None: conn.wait_command_prompt() conn.send('exit') conn.wait_command_prompt() conn.send('exit') conn.close() class SwitchConfigRotater: def __init__(self, fpath: str): self.fpath = fpath def rotate(self, backupdir: str): if not os.path.isdir(backupdir): os.mkdir(backupdir) latest_fpath = os.path.join(backupdir, 'latest.cfg') if os.path.isfile(latest_fpath): hash_latest = self._sha256(latest_fpath) hash_current = self._sha256(self.fpath) if hash_latest != hash_current: mtime = datetime.fromtimestamp(os.stat(latest_fpath).st_mtime, tz=ZoneInfo('Asia/Tokyo')) now = mtime.strftime('%Y%m%d%H%M%S') rotate_fpath = os.path.join(backupdir, now + '.cfg') shutil.move(latest_fpath, rotate_fpath) shutil.copy(self.fpath, latest_fpath) os.chmod(latest_fpath, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP) else: shutil.copy(self.fpath, latest_fpath) os.chmod(latest_fpath, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP) def _sha256(self, fpath): h = hashlib.sha256() with open(fpath, 'rb') as f: h.update(f.read()) return h.hexdigest() def main(fpath: str): config = JsonConfigLoader(fpath) for host in config.hosts: fname = host['hostname'] + '.cfg' scf = SwitchConfigFetcher(host) scf.fetch(config.tftp_host, os.path.join(config.tftp_path, fname)) scr = SwitchConfigRotater(os.path.join(config.tftp_rootdir, config.tftp_path, fname)) scr.rotate(os.path.join(config.backup_destdir, host['hostname'])) if __name__ == '__main__': config_fpath = './swcfg-backup.json' main(config_fpath)