查看原文
其他

【实用】用Python实现快速Ping一个IP网段地址

散尽浮华 释然IT杂谈 2023-04-23

转载于:www.yjsec.com/2020/11/07

ping命令是我们检查网络中最常用的命令,作为网络人员,基本上每天都会用到,可以很好地帮助我们分析和判定网络故障,如果有10设备,100台设备,1000台设备怎么办?一个个ping过去人都要疯掉了,这种情况在大型网络中我们有可能遇到,那怎么办呢?我们今天来看下如何用python来实现批量ping测试主机。


代码如下:

#!/usr/bin/python3# -*- coding: utf-8 -*-
import osimport argparseimport socketimport structimport selectimport time

ICMP_ECHO_REQUEST = 8 # Platform specificDEFAULT_TIMEOUT = 0.1DEFAULT_COUNT = 4

class Pinger(object): """ Pings to a host -- the Pythonic way"""
def __init__(self, target_host, count=DEFAULT_COUNT, timeout=DEFAULT_TIMEOUT): self.target_host = target_host self.count = count self.timeout = timeout

def do_checksum(self, source_string): """ Verify the packet integritity """ sum = 0 max_count = (len(source_string)/2)*2 count = 0 while count < max_count:
val = source_string[count + 1]*256 + source_string[count] sum = sum + val sum = sum & 0xffffffff count = count + 2
if max_count<len(source_string): sum = sum + ord(source_string[len(source_string) - 1]) sum = sum & 0xffffffff
sum = (sum >> 16) + (sum & 0xffff) sum = sum + (sum >> 16) answer = ~sum answer = answer & 0xffff answer = answer >> 8 | (answer << 8 & 0xff00) return answer
def receive_pong(self, sock, ID, timeout): """ Receive ping from the socket. """ time_remaining = timeout while True: start_time = time.time() readable = select.select([sock], [], [], time_remaining) time_spent = (time.time() - start_time) if readable[0] == []: # Timeout return
time_received = time.time() recv_packet, addr = sock.recvfrom(1024) icmp_header = recv_packet[20:28] type, code, checksum, packet_ID, sequence = struct.unpack( "bbHHh", icmp_header ) if packet_ID == ID: bytes_In_double = struct.calcsize("d") time_sent = struct.unpack("d", recv_packet[28:28 + bytes_In_double])[0] return time_received - time_sent
time_remaining = time_remaining - time_spent if time_remaining <= 0: return

def send_ping(self, sock, ID): """ Send ping to the target host """ target_addr = socket.gethostbyname(self.target_host)
my_checksum = 0
# Create a dummy heder with a 0 checksum. header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1) bytes_In_double = struct.calcsize("d") data = (192 - bytes_In_double) * "Q" data = struct.pack("d", time.time()) + bytes(data.encode('utf-8'))
# Get the checksum on the data and the dummy header. my_checksum = self.do_checksum(header + data) header = struct.pack( "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1 ) packet = header + data sock.sendto(packet, (target_addr, 1))

def ping_once(self): """ Returns the delay (in seconds) or none on timeout. """ icmp = socket.getprotobyname("icmp") try: sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp) except socket.error as e: if e.errno == 1: # Not superuser, so operation not permitted e.msg += "ICMP messages can only be sent from root user processes" raise socket.error(e.msg) except Exception as e: print("Exception: %s" %(e))
my_ID = os.getpid() & 0xFFFF
self.send_ping(sock, my_ID) delay = self.receive_pong(sock, my_ID, self.timeout) sock.close() return delay

def ping(self): """ Run the ping process """ for i in range(self.count): print ("Ping to %s..." % self.target_host,) try: delay = self.ping_once() except socket.gaierror as e: print ("Ping failed. (socket error: '%s')" % e[1]) break
if delay == None: print ("Ping failed. (timeout within %ssec.)" % self.timeout) else: delay = delay * 1000 print("Get pong in %0.4fms" % delay)

if __name__ == '__main__': alive = []    host_prefix = '192.168.242.' for i in range(1, 255): host = host_prefix + str(i) pinger = Pinger(target_host=host)
delay = pinger.ping_once() if delay == None: print("Ping %s 失败,超时2秒" % host) else: print("ping %s = %s ms" % (host, round(delay * 1000, 4))) alive.append(host)        # time.sleep(0.5)

测试如下:



您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存