from lxml import etree from datetime import datetime from OpenSSL import crypto from hashlib import sha1 from base64 import b64encode class XAdESSigner: """ XAdES-BES Enveloped signing of XML files """ XMLNS_DS = 'http://www.w3.org/2000/09/xmldsig#' XMLNS_XDS = 'http://uri.etsi.org/01903/v1.1.1#' SIGNATURE_ALG = 'sha1' DIGEST_ALG_SPEC = XMLNS_DS + SIGNATURE_ALG SIGNATURE_ALG_SPEC = XMLNS_DS + 'rsa-' + SIGNATURE_ALG CANONICAL_ALG_SPEC = 'http://www.w3.org/TR/2001/REC-xml-c14n-20010315' SIGNEDPROPS_REFERENCE_TYPE = XMLNS_XDS + 'SignedProperties' def __init__(self, xml_root, xml_reference_type, certificate_file): """ Initialize object with XML root element, XML reference type URL and certificate file path :param xml_root: lxml.etree.Element :param xml_reference_type: str :param certificate_file: str """ self.xml_root = xml_root self.xml_reference_type = xml_reference_type self.certificate_file = certificate_file def sign(self, certificate_password): """ Sign instance's XML and return it :param certificate_password: str :return: lxml.etree.Element """ main_id = self.__prepare_xml() try: with open(self.certificate_file, 'rb') as cf: p12 = crypto.load_pkcs12(cf.read(), certificate_password) self.__create_signature(main_id, p12.get_privatekey(), p12.get_certificate()) return self.xml_root except IOError: raise XaDESSignerException('Problem with accessing certificate file.') except crypto.Error: raise XaDESSignerException('Certificate password is wrong.') def __prepare_xml(self, main_id='data'): """ Check if XML already contains signature and add appropriate namespaces to root """ elements_num = len(self.xml_root) if (elements_num > 1 and self.xml_root[elements_num - 1].prefix and 'Signature' in self.xml_root[elements_num - 1].tag): raise XaDESSignerException('XML was already signed.') nsmap = self.xml_root.nsmap.copy() nsmap.update(self.__get_namespaces()) xml_root = etree.Element(self.xml_root.tag, nsmap=nsmap) xml_root[:] = self.xml_root[:] self.xml_root = xml_root main_element = self.xml_root[0] if 'Id' in main_element.keys() and len(main_element.attrib['Id']) > 0: return main_element.attrib['Id'] main_element.set('Id', main_id) return main_id def __create_signature(self, main_id, private_key, certificate): """ Create signature of all main XML parts along signed properties :param main_id: str :param certificate: OpenSSL.crypto.X509 :return: lxml.etree.Element """ signature_id = 'SignatureId' ds_object, signedprops_id = self.__create_signature_object(signature_id, certificate) ds_signature = etree.Element(self.__get_namespaced_tag('ds', 'Signature')) ds_signature.set('Id', signature_id) ds_signedinfo = etree.SubElement(ds_signature, self.__get_namespaced_tag('ds', 'SignedInfo')) ds_canonicalmethod = etree.SubElement(ds_signedinfo, self.__get_namespaced_tag('ds', 'CanonicalizationMethod')) ds_canonicalmethod.set('Algorithm', self.CANONICAL_ALG_SPEC) ds_signaturemethod = etree.SubElement(ds_signedinfo, self.__get_namespaced_tag('ds', 'SignatureMethod')) ds_signaturemethod.set('Algorithm', self.SIGNATURE_ALG_SPEC) ds_reference_main = etree.SubElement(ds_signedinfo, self.__get_namespaced_tag('ds', 'Reference')) ds_reference_main.set('Type', self.xml_reference_type) ds_reference_main.set('URI', '#' + main_id.strip('#')) ds_refmain_digestmethod = etree.SubElement(ds_reference_main, self.__get_namespaced_tag('ds', 'DigestMethod')) ds_refmain_digestmethod.set('Algorithm', self.DIGEST_ALG_SPEC) ds_refmain_digestvalue = etree.SubElement(ds_reference_main, self.__get_namespaced_tag('ds', 'DigestValue')) ds_reference_signedprops = etree.SubElement(ds_signedinfo, self.__get_namespaced_tag('ds', 'Reference')) ds_reference_signedprops.set('Type', self.SIGNEDPROPS_REFERENCE_TYPE) ds_reference_signedprops.set('URI', '#' + signedprops_id.strip('#')) ds_refsp_digestmethod = etree.SubElement( ds_reference_signedprops, self.__get_namespaced_tag('ds', 'DigestMethod') ) ds_refsp_digestmethod.set('Algorithm', self.DIGEST_ALG_SPEC) ds_refsp_digestvalue = etree.SubElement(ds_reference_signedprops, self.__get_namespaced_tag('ds', 'DigestValue')) ds_signaturevalue = etree.SubElement(ds_signature, self.__get_namespaced_tag('ds', 'SignatureValue')) ds_keyinfo = etree.SubElement(ds_signature, self.__get_namespaced_tag('ds', 'KeyInfo')) ds_X509data = etree.SubElement(ds_keyinfo, self.__get_namespaced_tag('ds', 'X509Data')) etree.SubElement( ds_X509data, self.__get_namespaced_tag('ds', 'X509Certificate') ).text = self.__get_certificate_der(certificate) # Append signature object to signature and signature to root to inherit all used namespaces ds_signature.append(ds_object) self.xml_root.append(ds_signature) # We generate digest values after XML is fully composed # First we generate digest value for main elements in XML ds_refmain_digestvalue.text = self.__generate_digest_value(self.xml_root[:-1]) # Then we generate digest for SignedProperties contained inside signature object ds_refsp_digestvalue.text = self.__generate_digest_value(ds_signature[-1][0][:1]) # Sign XML with private key after we have completed SignedInfo with digests ds_signaturevalue.text = self.__get_signed_xml(private_key, ds_signature[:1]) return ds_signature def __get_signed_xml(self, private_key, nodes): """ Sign canonicalized XML with provided private key, generated from applied nodes :param private_key: OpenSSL.crypto.PKey :param nodes: list :return: str """ return b64encode(crypto.sign(private_key, self.__get_canonical_xml(nodes), self.SIGNATURE_ALG)).decode() def __get_certificate_der(self, certificate): """ Return certificate in DER format encoded in base64 :param certificate: OpenSSL.crypto.X509 :return: str """ return b64encode(crypto.dump_certificate(crypto.FILETYPE_ASN1, certificate)).decode() def __get_canonical_xml(self, nodes): """ Generate canonical XML from applied nodes and replace all unknown namespaces :param nodes: list :return: bytearray """ canonical_xml = b'' for node in nodes: canonical_xml += etree.tostring(node, method='c14n', with_comments=False) return canonical_xml.replace(b' xmlns=""', b'') def __generate_digest_value(self, nodes): """ Generate digest value for generated canonical XML from applied nodes :param nodes: list :return: str """ return XAdESSigner.base64_sha1_str(self.__get_canonical_xml(nodes)) def __create_signature_object(self, signature_id, certificate): """ Create signature object which must also be signed as part of XadES signature :param signature_id: str :param certificate: OpenSSL.crypto.X509 :return: tuple """ signedprops_id = 'SignedPropertiesId' ds_object = etree.Element(self.__get_namespaced_tag('ds', 'Object')) xds_qualifyingprops = etree.SubElement(ds_object, self.__get_namespaced_tag('xds', 'QualifyingProperties')) xds_qualifyingprops.set('Target', '#' + signature_id.strip('#')) xds_signedprops = etree.SubElement(xds_qualifyingprops, self.__get_namespaced_tag('xds', 'SignedProperties')) xds_signedprops.set('Id', signedprops_id) xds_signedsigprops = etree.SubElement( xds_signedprops, self.__get_namespaced_tag('xds', 'SignedSignatureProperties') ) etree.SubElement( xds_signedsigprops, self.__get_namespaced_tag('xds', 'SigningTime') ).text = datetime.utcnow().isoformat()[:-3] + 'Z' xds_signingcert = etree.SubElement(xds_signedsigprops, self.__get_namespaced_tag('xds', 'SigningCertificate')) xds_cert = etree.SubElement(xds_signingcert, self.__get_namespaced_tag('xds', 'Cert')) xds_certdigest = etree.SubElement(xds_cert, self.__get_namespaced_tag('xds', 'CertDigest')) xds_digestmethod = etree.SubElement(xds_certdigest, self.__get_namespaced_tag('xds', 'DigestMethod')) xds_digestmethod.set('Algorithm', self.DIGEST_ALG_SPEC) etree.SubElement( xds_certdigest, self.__get_namespaced_tag('xds', 'DigestValue') ).text = self.__get_certificate_digest(certificate) xds_issuerserial = etree.SubElement(xds_cert, self.__get_namespaced_tag('xds', 'IssuerSerial')) etree.SubElement( xds_issuerserial, self.__get_namespaced_tag('ds', 'X509IssuerName') ).text = self.__get_certificate_issuer(certificate) etree.SubElement( xds_issuerserial, self.__get_namespaced_tag('ds', 'X509SerialNumber') ).text = '%d' % certificate.get_serial_number() xds_signaturepolicyid = etree.SubElement( xds_signedsigprops, self.__get_namespaced_tag('xds', 'SignaturePolicyIdentifier') ) etree.SubElement(xds_signaturepolicyid, self.__get_namespaced_tag('xds', 'SignaturePolicyImplied')) return ds_object, signedprops_id def __get_namespaced_tag(self, namespace, tag): """ Generate tag with prefixed namespace for lxml element name :param namespace: str :param tag: str :return: str """ namespaces = self.__get_namespaces() if namespace not in namespaces: raise XaDESSignerValueError('Namespace unknown.') return '{%s}%s' % (namespaces[namespace], tag) def __get_namespaces(self): """ Wrapper method which returns available namespaces in dictionary form :return: dict """ return { 'ds': self.XMLNS_DS, 'xds': self.XMLNS_XDS } def __get_certificate_digest(self, certificate): """ Get certificate digest :param certificate: OpenSSL.crypto.X509 :return: str """ return XAdESSigner.base64_sha1_str(crypto.dump_certificate(crypto.FILETYPE_ASN1, certificate)) def __get_certificate_issuer(self, certificate): """ Get certificate issuer - consists of everything known about issuer, separated with commas :param certificate: OpenSSL.crypto.X509 :return: str """ return ','.join(m.decode() + '=' + v.decode() for m, v in certificate.get_issuer().get_components()) @staticmethod def base64_sha1_str(string): """ Hash string or bytearray to SHA1 and encode it with base64 :param string: str or byte :return: str """ try: string = string.encode() except AttributeError: pass return b64encode(sha1(string).digest()).decode() class XaDESSignerException(Exception): pass class XaDESSignerValueError(Exception): pass