123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399 |
- # -*- coding: utf-8 -*-
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- # implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """
- Contains all core validation functions.
- """
- import os
- import re
- import socket
- import logging
- from . import utils
- from .exceptions import ParamValidationError
- __all__ = ('ParamValidationError', 'validate_integer', 'validate_float',
- 'validate_regexp', 'validate_port', 'validate_not_empty',
- 'validate_options', 'validate_multi_options', 'validate_ip',
- 'validate_multi_ip', 'validate_file', 'validate_ping',
- 'validate_multi_ping', 'validate_ssh', 'validate_multi_ssh',
- 'validate_sshkey', 'validate_ldap_url', 'validate_ldap_dn',
- 'validate_export', 'validate_multi_export',
- 'validate_writeable_directory')
- def validate_integer(param, options=None):
- """
- Raises ParamValidationError if given param is not integer.
- """
- if not param:
- return
- options = options or []
- try:
- int(param)
- except ValueError:
- logging.debug('validate_integer(%s, options=%s) failed.' %
- (param, options))
- msg = 'Given value is not an integer: %s'
- raise ParamValidationError(msg % param)
- def validate_float(param, options=None):
- """
- Raises ParamValidationError if given param is not a float.
- """
- if not param:
- return
- options = options or []
- try:
- float(param)
- except ValueError:
- logging.debug('validate_float(%s, options=%s) failed.' %
- (param, options))
- msg = 'Given value is not a float: %s'
- raise ParamValidationError(msg % param)
- def validate_regexp(param, options=None):
- """
- Raises ParamValidationError if given param doesn't match at least
- one of regular expressions given in options.
- """
- if not param:
- return
- options = options or []
- for regex in options:
- if re.search(regex, param):
- break
- else:
- logging.debug('validate_regexp(%s, options=%s) failed.' %
- (param, options))
- msg = 'Given value does not match required regular expression: %s'
- raise ParamValidationError(msg % param)
- def validate_multi_regexp(param, options=None):
- """
- Raises ParamValidationError if any of the comma separated values given
- in param doesn't match one of the regular expressions given in options.
- """
- options = options or []
- for i in param.split(','):
- validate_regexp(i.strip(), options=options)
- def validate_port(param, options=None):
- """
- Raises ParamValidationError if given param is not a decimal number
- in range (0, 65535).
- """
- if not param:
- return
- options = options or []
- validate_integer(param, options)
- port = int(param)
- if not (port >= 0 and port < 65535):
- logging.debug('validate_port(%s, options=%s) failed.' %
- (param, options))
- msg = 'Given value is outside the range of (0, 65535): %s'
- raise ParamValidationError(msg % param)
- def validate_not_empty(param, options=None):
- """
- Raises ParamValidationError if given param is empty.
- """
- options = options or []
- if not param and param is not False:
- logging.debug('validate_not_empty(%s, options=%s) failed.' %
- (param, options))
- msg = 'Given value is not allowed: %s'
- raise ParamValidationError(msg % param)
- def validate_options(param, options=None):
- """
- Raises ParamValidationError if given param is not member of options.
- """
- if not param:
- return
- options = options or []
- validate_not_empty(param, options)
- if param not in options:
- logging.debug('validate_options(%s, options=%s) failed.' %
- (param, options))
- msg = 'Given value is not member of allowed values %s: %s'
- raise ParamValidationError(msg % (options, param))
- def validate_multi_options(param, options=None):
- """
- Validates if comma separated values given in params are members
- of options.
- """
- if not param:
- return
- options = options or []
- for i in param.split(','):
- validate_options(i.strip(), options=options)
- def validate_ip(param, options=None):
- """
- Raises ParamValidationError if given parameter value is not in IPv4
- or IPv6 address.
- """
- if not param:
- return
- for family in (socket.AF_INET, socket.AF_INET6):
- try:
- socket.inet_pton(family, param)
- return family
- except socket.error:
- continue
- else:
- logging.debug('validate_ip(%s, options=%s) failed.' %
- (param, options))
- msg = 'Given host is not in IP address format: %s'
- raise ParamValidationError(msg % param)
- def validate_multi_ip(param, options=None):
- """
- Raises ParamValidationError if comma separated IP addresses given
- parameter value are in IPv4 or IPv6 aformat.
- """
- for host in param.split(','):
- host = host.split('/', 1)[0]
- validate_ip(host.strip(), options)
- def validate_file(param, options=None):
- """
- Raises ParamValidationError if provided file in param does not exist.
- """
- if not param:
- return
- options = options or []
- if not os.path.isfile(param):
- logging.debug('validate_file(%s, options=%s) failed.' %
- (param, options))
- msg = 'Given file does not exist: %s'
- raise ParamValidationError(msg % param)
- def validate_writeable_directory(param, options=None):
- """
- Raises ParamValidationError if provided directory does not exist or
- is not writeable.
- """
- if not param:
- return
- options = options or []
- path = os.path.expanduser(param)
- if not ((os.path.isdir(path) and os.access(path, os.W_OK)) or
- os.access(
- os.path.normpath(os.path.join(path, os.pardir)), os.W_OK)):
- logging.debug('validate_writeable_directory(%s, options=%s) failed.' %
- (param, options))
- msg = 'Given directory does not exist or is not writeable: %s'
- raise ParamValidationError(msg % param)
- def validate_ping(param, options=None):
- """
- Raises ParamValidationError if provided host does not answer to ICMP
- echo request.
- """
- if not param:
- return
- options = options or []
- rc, out = utils.execute(['/bin/ping', '-c', '1', str(param)],
- can_fail=False)
- if rc != 0:
- logging.debug('validate_ping(%s, options=%s) failed.' %
- (param, options))
- msg = 'Given host is unreachable: %s'
- raise ParamValidationError(msg % param)
- def validate_multi_ping(param, options=None):
- """
- Raises ParamValidationError if comma separated host given in param
- do not answer to ICMP echo request.
- """
- options = options or []
- for host in param.split(","):
- validate_ping(host.strip())
- _tested_ports = []
- def touch_port(host, port):
- """
- Check that provided host is listening on provided port.
- """
- key = "%s:%d" % (host, port)
- if key in _tested_ports:
- return
- sock = socket.create_connection((host, port))
- sock.shutdown(socket.SHUT_RDWR)
- sock.close()
- _tested_ports.append(key)
- def validate_ssh(param, options=None):
- """
- Raises ParamValidationError if provided host does not listen
- on port 22.
- """
- if not param:
- return
- options = options or []
- try:
- touch_port(param.strip(), 22)
- except socket.error:
- logging.debug('validate_ssh(%s, options=%s) failed.' %
- (param, options))
- msg = 'Given host does not listen on port 22: %s'
- raise ParamValidationError(msg % param)
- def validate_multi_ssh(param, options=None):
- """
- Raises ParamValidationError if comma separated host provided
- in param do not listen on port 22.
- """
- options = options or []
- for host in param.split(","):
- validate_ssh(host)
- def validate_sshkey(param, options=None):
- """
- Raises ParamValidationError if provided sshkey file is not public key.
- """
- if not param:
- return
- with open(param) as sshkey:
- line = sshkey.readline()
- msg = None
- if not re.search('ssh-|ecdsa', line):
- msg = ('Invalid content header in %s, Public SSH key is required.'
- % param)
- if re.search('BEGIN [RD]SA PRIVATE KEY', line):
- msg = 'Public SSH key is required. You passed private key.'
- if msg:
- raise ParamValidationError(msg)
- def validate_ldap_url(param, options=None):
- """
- Raises ParamValidationError if provided param is not a valid LDAP URL
- """
- if not param:
- return
- try:
- import ldapurl
- except ImportError:
- msg = (
- 'The python ldap package is required to use this functionality.'
- )
- raise ParamValidationError(msg)
- try:
- ldapurl.LDAPUrl(param)
- except ValueError as ve:
- msg = ('The given string [%s] is not a valid LDAP URL: %s' %
- (param, ve))
- raise ParamValidationError(msg)
- def validate_ldap_dn(param, options=None):
- """
- Raises ParamValidationError if provided param is not a valid LDAP DN
- """
- if not param:
- return
- try:
- import ldap
- import ldap.dn
- except ImportError:
- msg = (
- 'The python ldap package is required to use this functionality.'
- )
- raise ParamValidationError(msg)
- try:
- ldap.dn.str2dn(param)
- except ldap.DECODING_ERROR as de:
- msg = ('The given string [%s] is not a valid LDAP DN: %s' %
- (param, de))
- raise ParamValidationError(msg)
- def validate_export(param, options=None):
- """
- Raises ParamValidationError if the nfs export is not valid.
- """
- msg = ('The nfs export [%s] is not a valid export - use squares around ipv6 addresses -.' %
- param)
- try:
- [ip, export] = param.split(':/')
- except ValueError:
- raise ParamValidationError(msg)
- get_squares = re.search(r'\[([^]]+)\]', ip)
- ip_to_test = ip
- if get_squares:
- # this should be a valid ipv6 address.
- ip_to_test = get_squares.group(1)
- if not utils.network.is_ipv6(ip_to_test):
- raise ParamValidationError(msg)
- else:
- # this should be an ipv4. Cannot have ipv6 without square braquet
- # notation here, as the mount will fail.
- if not utils.network.is_ipv4(ip):
- raise ParamValidationError(msg)
- validate_ip(ip_to_test, options)
- if not export:
- raise ParamValidationError(msg)
- def validate_multi_export(param, options=None):
- """
- Raises ParamValidationError if comma separated nfs export given
- in param is not valid
- """
- for export in param.split(","):
- validate_export(export)
- def validate_neutron(param, options=None):
- """
- Raises ParamValidationError if neutron is not enabled.
- This is intended to make user aware nova-network has been removed
- in ocata cycle.
- """
- validate_options(param, options=options)
- if param != 'y':
- msg = ('Nova network support has been removed in Ocata. Neutron service must be enabled')
- raise ParamValidationError(msg)
|