network.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. # -*- coding: utf-8 -*-
  2. # Licensed under the Apache License, Version 2.0 (the "License");
  3. # you may not use this file except in compliance with the License.
  4. # You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software
  9. # distributed under the License is distributed on an "AS IS" BASIS,
  10. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  11. # implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import netifaces
  15. import socket
  16. import logging
  17. from ..exceptions import NetworkError
  18. from .shell import ScriptRunner
  19. netaddr_available = True
  20. try:
  21. import netaddr
  22. except ImportError:
  23. netaddr_available = False
  24. def get_localhost_ip():
  25. """
  26. Returns IP address of localhost.
  27. """
  28. # Try to get the IPv4 or IPv6 default gateway, then open a socket
  29. # to discover our local IP
  30. gw = None
  31. for protocol in (socket.AF_INET, socket.AF_INET6):
  32. try:
  33. gw = netifaces.gateways()['default'][protocol][0]
  34. if protocol == socket.AF_INET6:
  35. gw = gw + '%' + netifaces.gateways()['default'][protocol][1]
  36. discovered_protocol = protocol
  37. break
  38. except KeyError: # No default gw for this protocol
  39. continue
  40. else:
  41. raise NetworkError('Local IP address discovery failed. Please set '
  42. 'a default gateway for your system.')
  43. address = socket.getaddrinfo(gw, 0, discovered_protocol,
  44. socket.SOCK_DGRAM)[0]
  45. s = socket.socket(discovered_protocol, socket.SOCK_DGRAM)
  46. s.connect(address[4])
  47. # Remove chars after %. Does nothing on IPv4, removes scope id in IPv6
  48. loc_ip = s.getsockname()[0].split('%')[0]
  49. return loc_ip
  50. _host_cache = {}
  51. def host2ip(hostname, allow_localhost=False):
  52. """
  53. Converts given hostname to IP address. Raises NetworkError
  54. if conversion failed.
  55. """
  56. key = '{}:{}'.format(hostname, allow_localhost)
  57. if key in _host_cache:
  58. return _host_cache[key]
  59. try:
  60. ip_list = list(sockets[4][0] for sockets in
  61. socket.getaddrinfo(hostname, 22, 0, 0, socket.IPPROTO_TCP))
  62. if allow_localhost:
  63. ip = ip_list[0]
  64. else:
  65. routable = [ip for ip in ip_list if ip not in ('127.0.0.1', '::1')]
  66. if not routable:
  67. raise NameError("Host %s is not routable, please fix"
  68. "your /etc/hosts", host)
  69. if len(routable) > 1:
  70. logging.warning("Multiple IPs for host detected!")
  71. ip = routable[0]
  72. _host_cache[key] = ip
  73. return ip
  74. except NameError:
  75. # given hostname is localhost, return appropriate IP address
  76. return get_localhost_ip()
  77. except socket.error:
  78. raise NetworkError('Unknown hostname %s.' % hostname)
  79. except Exception as ex:
  80. raise NetworkError('Unknown error appeared: %s' % repr(ex))
  81. def is_ipv6(host):
  82. if not netaddr_available:
  83. raise ImportError(
  84. "netaddr module unavailable, install with pip install netaddr"
  85. )
  86. host = host.strip()
  87. try:
  88. return netaddr.IPAddress(host.strip('[]')).version == 6
  89. except netaddr.core.AddrFormatError:
  90. # Most probably a hostname
  91. return False
  92. def is_ipv4(host):
  93. if not netaddr_available:
  94. raise ImportError(
  95. "netaddr module unavailable, install with pip install netaddr"
  96. )
  97. host = host.strip()
  98. try:
  99. return netaddr.IPAddress(host).version == 4
  100. except netaddr.core.AddrFormatError:
  101. # Most probably a hostname
  102. return False
  103. def force_ip(host, allow_localhost=False):
  104. if not(is_ipv6(host) or is_ipv4(host)):
  105. host = host2ip(host, allow_localhost=allow_localhost)
  106. return host
  107. def device_from_ip(ip):
  108. server = ScriptRunner()
  109. server.append("DEVICE=($(ip -o address show to %s | cut -f 2 -d ' '))"
  110. % ip)
  111. # Ensure that the IP is only assigned to one interface
  112. server.append("if [ ! -z ${DISPLAY[1]} ]; then false; fi")
  113. # Test device, raises an exception if it doesn't exist
  114. server.append("ip link show \"$DEVICE\" > /dev/null")
  115. server.append("echo $DEVICE")
  116. rv, stdout = server.execute()
  117. return stdout.strip()