Implementation

Writing and receiving Epistula requests requires minimal boilerplate, and can be done in any language.

Dependencies

pip install substrate-interface uuid

Preparing Request

Generate Request Headers

import json
from hashlib import sha256
from uuid import uuid4
from math import ceil
import time
from substrateinterface import Keypair

def generate_header(
    hotkey: Keypair,
    body: bytes,
    signed_for: Optional[str] = None,
) -> Dict[str, Any]:
    timestamp = round(time.time() * 1000)
    timestampInterval = ceil(timestamp / 1e4) * 1e4
    uuid = str(uuid4())
    headers = {
        "Epistula-Version": "2",
        "Epistula-Timestamp": str(timestamp),
        "Epistula-Uuid": uuid,
        "Epistula-Signed-By": hotkey.ss58_address,
        "Epistula-Request-Signature": "0x"
        + hotkey.sign(
            f"{sha256(body).hexdigest()}.{uuid}.{timestamp}.{signed_for or ''}"
        ).hex(),
    }
    if signed_for:
        headers["Epistula-Signed-For"] = signed_for
        headers["Epistula-Secret-Signature-0"] = (
            "0x" + hotkey.sign(str(timestampInterval - 1) + "." + signed_for).hex()
        )
        headers["Epistula-Secret-Signature-1"] = (
            "0x" + hotkey.sign(str(timestampInterval) + "." + signed_for).hex()
        )
        headers["Epistula-Secret-Signature-2"] = (
            "0x" + hotkey.sign(str(timestampInterval + 1) + "." + signed_for).hex()
        )
    return headers
  
  # Example usage
  body = {"key": "value"}
  body_bytes = json.dumps(body).encode('utf-8')
  headers = generate_header(wallet.hotkey, body_bytes, signed_for=receiver_hotkey_ss58)

Send Request

session.post(
  url=f"http://localhost:4000/",
  headers=headers,
  json=body_bytes,
)

Receiving Request

Verify Signature

def verify_signature_v2(
    signature, body: bytes, timestamp, uuid, signed_for, signed_by, now
) -> Optional[Annotated[str, "Error Message"]]:
    if not isinstance(signature, str):
        return "Invalid Signature"
    timestamp = int(timestamp)
    if not isinstance(timestamp, int):
        return "Invalid Timestamp"
    if not isinstance(signed_by, str):
        return "Invalid Sender key"
    if not isinstance(signed_for, str):
        return "Invalid receiver key"
    if not isinstance(uuid, str):
        return "Invalid uuid"
    if not isinstance(body, bytes):
        return "Body is not of type bytes"
    ALLOWED_DELTA_MS = 8000
    keypair = Keypair(ss58_address=signed_by)
    if timestamp + ALLOWED_DELTA_MS < now:
        return "Request is too stale"
    message = f"{sha256(body).hexdigest()}.{uuid}.{timestamp}.{signed_for}"
    print(message)
    verified = keypair.verify(message, signature)
    if not verified:
        return "Signature Mismatch"
    return None


# Example Usage
now = round(time.time() * 1000)  # Current time in milliseconds
body = request.body  # Assuming this is bytes
headers = request.headers

err = verify_signature_v2(
    headers.get("Epistula-Request-Signature"),
    body,
    headers.get("Epistula-Timestamp"),
    headers.get("Epistula-Uuid"),
    headers.get("Epistula-Signed-For"),
    headers.get("Epistula-Signed-By"),
    now
)
if err:
    raise HTTPException(status_code=400, detail=err)