|
@@ -0,0 +1,261 @@
|
|
|
+from lxml import etree
|
|
|
+from datetime import datetime
|
|
|
+from OpenSSL import crypto
|
|
|
+from hashlib import sha1
|
|
|
+from base64 import b64encode
|
|
|
+
|
|
|
+
|
|
|
+class XAdESSigner:
|
|
|
+ """
|
|
|
+ XAdES-BES Enveloped signing of XML files
|
|
|
+ """
|
|
|
+
|
|
|
+ XMLNS_DS = 'http://www.w3.org/2000/09/xmldsig#'
|
|
|
+ XMLNS_XDS = 'http://uri.etsi.org/01903/v1.1.1#'
|
|
|
+ SIGNATURE_ALG = 'sha1'
|
|
|
+ DIGEST_ALG_SPEC = XMLNS_DS + SIGNATURE_ALG
|
|
|
+ SIGNATURE_ALG_SPEC = XMLNS_DS + 'rsa-' + SIGNATURE_ALG
|
|
|
+ CANONICAL_ALG_SPEC = 'http://www.w3.org/TR/2001/REC-xml-c14n-20010315'
|
|
|
+ SIGNEDPROPS_REFERENCE_TYPE = XMLNS_XDS + 'SignedProperties'
|
|
|
+
|
|
|
+ def __init__(self, xml_root, xml_reference_type, certificate_file):
|
|
|
+ """
|
|
|
+ Initialize object with XML root element, XML reference type URL and certificate file path
|
|
|
+ :param xml_root: lxml.etree.Element
|
|
|
+ :param xml_reference_type: str
|
|
|
+ :param certificate_file: str
|
|
|
+ """
|
|
|
+ self.xml_root = xml_root
|
|
|
+ self.xml_reference_type = xml_reference_type
|
|
|
+ self.certificate_file = certificate_file
|
|
|
+
|
|
|
+ def sign(self, certificate_password):
|
|
|
+ """
|
|
|
+ Sign instance's XML and return it
|
|
|
+ :param certificate_password: str
|
|
|
+ :return: lxml.etree.Element
|
|
|
+ """
|
|
|
+ main_id = self.__prepare_xml()
|
|
|
+ try:
|
|
|
+ with open(self.certificate_file, 'rb') as cf:
|
|
|
+ p12 = crypto.load_pkcs12(cf.read(), certificate_password)
|
|
|
+ self.__create_signature(main_id, p12.get_privatekey(), p12.get_certificate())
|
|
|
+ return self.xml_root
|
|
|
+ except IOError:
|
|
|
+ raise XaDESSignerException('Problem with accessing certificate file.')
|
|
|
+ except crypto.Error:
|
|
|
+ raise XaDESSignerException('Certificate password is wrong.')
|
|
|
+
|
|
|
+ def __prepare_xml(self, main_id='data'):
|
|
|
+ """
|
|
|
+ Check if XML already contains signature and add appropriate namespaces to root
|
|
|
+ """
|
|
|
+ elements_num = len(self.xml_root)
|
|
|
+ if (elements_num > 1 and
|
|
|
+ self.xml_root[elements_num - 1].prefix and 'Signature' in self.xml_root[elements_num - 1].tag):
|
|
|
+ raise XaDESSignerException('XML was already signed.')
|
|
|
+ nsmap = self.xml_root.nsmap.copy()
|
|
|
+ nsmap.update(self.__get_namespaces())
|
|
|
+ xml_root = etree.Element(self.xml_root.tag, nsmap=nsmap)
|
|
|
+ xml_root[:] = self.xml_root[:]
|
|
|
+ self.xml_root = xml_root
|
|
|
+ main_element = self.xml_root[0]
|
|
|
+ if 'Id' in main_element.keys() and len(main_element.attrib['Id']) > 0:
|
|
|
+ return main_element.attrib['Id']
|
|
|
+ main_element.set('Id', main_id)
|
|
|
+ return main_id
|
|
|
+
|
|
|
+ def __create_signature(self, main_id, private_key, certificate):
|
|
|
+ """
|
|
|
+ Create signature of all main XML parts along signed properties
|
|
|
+ :param main_id: str
|
|
|
+ :param certificate: OpenSSL.crypto.X509
|
|
|
+ :return: lxml.etree.Element
|
|
|
+ """
|
|
|
+ signature_id = 'SignatureId'
|
|
|
+ ds_object, signedprops_id = self.__create_signature_object(signature_id, certificate)
|
|
|
+ ds_signature = etree.Element(self.__get_namespaced_tag('ds', 'Signature'))
|
|
|
+ ds_signature.set('Id', signature_id)
|
|
|
+ ds_signedinfo = etree.SubElement(ds_signature, self.__get_namespaced_tag('ds', 'SignedInfo'))
|
|
|
+ ds_canonicalmethod = etree.SubElement(ds_signedinfo, self.__get_namespaced_tag('ds', 'CanonicalizationMethod'))
|
|
|
+ ds_canonicalmethod.set('Algorithm', self.CANONICAL_ALG_SPEC)
|
|
|
+ ds_signaturemethod = etree.SubElement(ds_signedinfo, self.__get_namespaced_tag('ds', 'SignatureMethod'))
|
|
|
+ ds_signaturemethod.set('Algorithm', self.SIGNATURE_ALG_SPEC)
|
|
|
+ ds_reference_main = etree.SubElement(ds_signedinfo, self.__get_namespaced_tag('ds', 'Reference'))
|
|
|
+ ds_reference_main.set('Type', self.xml_reference_type)
|
|
|
+ ds_reference_main.set('URI', '#' + main_id.strip('#'))
|
|
|
+ ds_refmain_digestmethod = etree.SubElement(ds_reference_main, self.__get_namespaced_tag('ds', 'DigestMethod'))
|
|
|
+ ds_refmain_digestmethod.set('Algorithm', self.DIGEST_ALG_SPEC)
|
|
|
+ ds_refmain_digestvalue = etree.SubElement(ds_reference_main, self.__get_namespaced_tag('ds', 'DigestValue'))
|
|
|
+ ds_reference_signedprops = etree.SubElement(ds_signedinfo, self.__get_namespaced_tag('ds', 'Reference'))
|
|
|
+ ds_reference_signedprops.set('Type', self.SIGNEDPROPS_REFERENCE_TYPE)
|
|
|
+ ds_reference_signedprops.set('URI', '#' + signedprops_id.strip('#'))
|
|
|
+ ds_refsp_digestmethod = etree.SubElement(
|
|
|
+ ds_reference_signedprops,
|
|
|
+ self.__get_namespaced_tag('ds', 'DigestMethod')
|
|
|
+ )
|
|
|
+ ds_refsp_digestmethod.set('Algorithm', self.DIGEST_ALG_SPEC)
|
|
|
+ ds_refsp_digestvalue = etree.SubElement(ds_reference_signedprops, self.__get_namespaced_tag('ds', 'DigestValue'))
|
|
|
+ ds_signaturevalue = etree.SubElement(ds_signature, self.__get_namespaced_tag('ds', 'SignatureValue'))
|
|
|
+ ds_keyinfo = etree.SubElement(ds_signature, self.__get_namespaced_tag('ds', 'KeyInfo'))
|
|
|
+ ds_X509data = etree.SubElement(ds_keyinfo, self.__get_namespaced_tag('ds', 'X509Data'))
|
|
|
+ etree.SubElement(
|
|
|
+ ds_X509data,
|
|
|
+ self.__get_namespaced_tag('ds', 'X509Certificate')
|
|
|
+ ).text = self.__get_certificate_der(certificate)
|
|
|
+
|
|
|
+ # Append signature object to signature and signature to root to inherit all used namespaces
|
|
|
+ ds_signature.append(ds_object)
|
|
|
+ self.xml_root.append(ds_signature)
|
|
|
+
|
|
|
+ # We generate digest values after XML is fully composed
|
|
|
+ # First we generate digest value for main elements in XML
|
|
|
+ ds_refmain_digestvalue.text = self.__generate_digest_value(self.xml_root[:-1])
|
|
|
+ # Then we generate digest for SignedProperties contained inside signature object
|
|
|
+ ds_refsp_digestvalue.text = self.__generate_digest_value(ds_signature[-1][0][:1])
|
|
|
+
|
|
|
+ # Sign XML with private key after we have completed SignedInfo with digests
|
|
|
+ ds_signaturevalue.text = self.__get_signed_xml(private_key, ds_signature[:1])
|
|
|
+
|
|
|
+ return ds_signature
|
|
|
+
|
|
|
+ def __get_signed_xml(self, private_key, nodes):
|
|
|
+ """
|
|
|
+ Sign canonicalized XML with provided private key, generated from applied nodes
|
|
|
+ :param private_key: OpenSSL.crypto.PKey
|
|
|
+ :param nodes: list
|
|
|
+ :return: str
|
|
|
+ """
|
|
|
+ return b64encode(crypto.sign(private_key, self.__get_canonical_xml(nodes), self.SIGNATURE_ALG)).decode()
|
|
|
+
|
|
|
+ def __get_certificate_der(self, certificate):
|
|
|
+ """
|
|
|
+ Return certificate in DER format encoded in base64
|
|
|
+ :param certificate: OpenSSL.crypto.X509
|
|
|
+ :return: str
|
|
|
+ """
|
|
|
+ return b64encode(crypto.dump_certificate(crypto.FILETYPE_ASN1, certificate)).decode()
|
|
|
+
|
|
|
+ def __get_canonical_xml(self, nodes):
|
|
|
+ """
|
|
|
+ Generate canonical XML from applied nodes and replace all unknown namespaces
|
|
|
+ :param nodes: list
|
|
|
+ :return: bytearray
|
|
|
+ """
|
|
|
+ canonical_xml = b''
|
|
|
+ for node in nodes:
|
|
|
+ canonical_xml += etree.tostring(node, method='c14n', with_comments=False)
|
|
|
+ return canonical_xml.replace(b' xmlns=""', b'')
|
|
|
+
|
|
|
+ def __generate_digest_value(self, nodes):
|
|
|
+ """
|
|
|
+ Generate digest value for generated canonical XML from applied nodes
|
|
|
+ :param nodes: list
|
|
|
+ :return: str
|
|
|
+ """
|
|
|
+ return XAdESSigner.base64_sha1_str(self.__get_canonical_xml(nodes))
|
|
|
+
|
|
|
+ def __create_signature_object(self, signature_id, certificate):
|
|
|
+ """
|
|
|
+ Create signature object which must also be signed as part of XadES signature
|
|
|
+ :param signature_id: str
|
|
|
+ :param certificate: OpenSSL.crypto.X509
|
|
|
+ :return: tuple
|
|
|
+ """
|
|
|
+ signedprops_id = 'SignedPropertiesId'
|
|
|
+ ds_object = etree.Element(self.__get_namespaced_tag('ds', 'Object'))
|
|
|
+ xds_qualifyingprops = etree.SubElement(ds_object, self.__get_namespaced_tag('xds', 'QualifyingProperties'))
|
|
|
+ xds_qualifyingprops.set('Target', '#' + signature_id.strip('#'))
|
|
|
+ xds_signedprops = etree.SubElement(xds_qualifyingprops, self.__get_namespaced_tag('xds', 'SignedProperties'))
|
|
|
+ xds_signedprops.set('Id', signedprops_id)
|
|
|
+ xds_signedsigprops = etree.SubElement(
|
|
|
+ xds_signedprops,
|
|
|
+ self.__get_namespaced_tag('xds', 'SignedSignatureProperties')
|
|
|
+ )
|
|
|
+ etree.SubElement(
|
|
|
+ xds_signedsigprops,
|
|
|
+ self.__get_namespaced_tag('xds', 'SigningTime')
|
|
|
+ ).text = datetime.utcnow().isoformat()[:-3] + 'Z'
|
|
|
+ xds_signingcert = etree.SubElement(xds_signedsigprops, self.__get_namespaced_tag('xds', 'SigningCertificate'))
|
|
|
+ xds_cert = etree.SubElement(xds_signingcert, self.__get_namespaced_tag('xds', 'Cert'))
|
|
|
+ xds_certdigest = etree.SubElement(xds_cert, self.__get_namespaced_tag('xds', 'CertDigest'))
|
|
|
+ xds_digestmethod = etree.SubElement(xds_certdigest, self.__get_namespaced_tag('xds', 'DigestMethod'))
|
|
|
+ xds_digestmethod.set('Algorithm', self.DIGEST_ALG_SPEC)
|
|
|
+ etree.SubElement(
|
|
|
+ xds_certdigest,
|
|
|
+ self.__get_namespaced_tag('xds', 'DigestValue')
|
|
|
+ ).text = self.__get_certificate_digest(certificate)
|
|
|
+ xds_issuerserial = etree.SubElement(xds_cert, self.__get_namespaced_tag('xds', 'IssuerSerial'))
|
|
|
+ etree.SubElement(
|
|
|
+ xds_issuerserial,
|
|
|
+ self.__get_namespaced_tag('ds', 'X509IssuerName')
|
|
|
+ ).text = self.__get_certificate_issuer(certificate)
|
|
|
+ etree.SubElement(
|
|
|
+ xds_issuerserial,
|
|
|
+ self.__get_namespaced_tag('ds', 'X509SerialNumber')
|
|
|
+ ).text = '%d' % certificate.get_serial_number()
|
|
|
+ xds_signaturepolicyid = etree.SubElement(
|
|
|
+ xds_signedsigprops,
|
|
|
+ self.__get_namespaced_tag('xds', 'SignaturePolicyIdentifier')
|
|
|
+ )
|
|
|
+ etree.SubElement(xds_signaturepolicyid, self.__get_namespaced_tag('xds', 'SignaturePolicyImplied'))
|
|
|
+ return ds_object, signedprops_id
|
|
|
+
|
|
|
+ def __get_namespaced_tag(self, namespace, tag):
|
|
|
+ """
|
|
|
+ Generate tag with prefixed namespace for lxml element name
|
|
|
+ :param namespace: str
|
|
|
+ :param tag: str
|
|
|
+ :return: str
|
|
|
+ """
|
|
|
+ namespaces = self.__get_namespaces()
|
|
|
+ if namespace not in namespaces:
|
|
|
+ raise XaDESSignerValueError('Namespace unknown.')
|
|
|
+ return '{%s}%s' % (namespaces[namespace], tag)
|
|
|
+
|
|
|
+ def __get_namespaces(self):
|
|
|
+ """
|
|
|
+ Wrapper method which returns available namespaces in dictionary form
|
|
|
+ :return: dict
|
|
|
+ """
|
|
|
+ return {
|
|
|
+ 'ds': self.XMLNS_DS,
|
|
|
+ 'xds': self.XMLNS_XDS
|
|
|
+ }
|
|
|
+
|
|
|
+ def __get_certificate_digest(self, certificate):
|
|
|
+ """
|
|
|
+ Get certificate digest
|
|
|
+ :param certificate: OpenSSL.crypto.X509
|
|
|
+ :return: str
|
|
|
+ """
|
|
|
+ return XAdESSigner.base64_sha1_str(crypto.dump_certificate(crypto.FILETYPE_ASN1, certificate))
|
|
|
+
|
|
|
+ def __get_certificate_issuer(self, certificate):
|
|
|
+ """
|
|
|
+ Get certificate issuer - consists of everything known about issuer, separated with commas
|
|
|
+ :param certificate: OpenSSL.crypto.X509
|
|
|
+ :return: str
|
|
|
+ """
|
|
|
+ return ','.join(m.decode() + '=' + v.decode() for m, v in certificate.get_issuer().get_components())
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def base64_sha1_str(string):
|
|
|
+ """
|
|
|
+ Hash string or bytearray to SHA1 and encode it with base64
|
|
|
+ :param string: str or byte
|
|
|
+ :return: str
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ string = string.encode()
|
|
|
+ except AttributeError:
|
|
|
+ pass
|
|
|
+ return b64encode(sha1(string).digest()).decode()
|
|
|
+
|
|
|
+
|
|
|
+class XaDESSignerException(Exception):
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
+class XaDESSignerValueError(Exception):
|
|
|
+ pass
|