Implementation
Writing and receiving Epistula requests requires minimal boilerplate, and can be done in any language.
Dependencies
pip install substrate-interface uuid
Preparing Request
Generate Request Headers
import json
from hashlib import sha256
from uuid import uuid4
from math import ceil
import time
from substrateinterface import Keypair
def generate_header(
hotkey: Keypair,
body: bytes,
signed_for: Optional[str] = None,
) -> Dict[str, Any]:
timestamp = round(time.time() * 1000)
timestampInterval = ceil(timestamp / 1e4) * 1e4
uuid = str(uuid4())
headers = {
"Epistula-Version": "2",
"Epistula-Timestamp": str(timestamp),
"Epistula-Uuid": uuid,
"Epistula-Signed-By": hotkey.ss58_address,
"Epistula-Request-Signature": "0x"
+ hotkey.sign(
f"{sha256(body).hexdigest()}.{uuid}.{timestamp}.{signed_for or ''}"
).hex(),
}
if signed_for:
headers["Epistula-Signed-For"] = signed_for
headers["Epistula-Secret-Signature-0"] = (
"0x" + hotkey.sign(str(timestampInterval - 1) + "." + signed_for).hex()
)
headers["Epistula-Secret-Signature-1"] = (
"0x" + hotkey.sign(str(timestampInterval) + "." + signed_for).hex()
)
headers["Epistula-Secret-Signature-2"] = (
"0x" + hotkey.sign(str(timestampInterval + 1) + "." + signed_for).hex()
)
return headers
# Example usage
body = {"key": "value"}
body_bytes = json.dumps(body).encode('utf-8')
headers = generate_header(wallet.hotkey, body_bytes, signed_for=receiver_hotkey_ss58)
Send Request
session.post(
url=f"http://localhost:4000/",
headers=headers,
json=body_bytes,
)
Receiving Request
Verify Signature
def verify_signature_v2(
signature, body: bytes, timestamp, uuid, signed_for, signed_by, now
) -> Optional[Annotated[str, "Error Message"]]:
if not isinstance(signature, str):
return "Invalid Signature"
timestamp = int(timestamp)
if not isinstance(timestamp, int):
return "Invalid Timestamp"
if not isinstance(signed_by, str):
return "Invalid Sender key"
if not isinstance(signed_for, str):
return "Invalid receiver key"
if not isinstance(uuid, str):
return "Invalid uuid"
if not isinstance(body, bytes):
return "Body is not of type bytes"
ALLOWED_DELTA_MS = 8000
keypair = Keypair(ss58_address=signed_by)
if timestamp + ALLOWED_DELTA_MS < now:
return "Request is too stale"
message = f"{sha256(body).hexdigest()}.{uuid}.{timestamp}.{signed_for}"
print(message)
verified = keypair.verify(message, signature)
if not verified:
return "Signature Mismatch"
return None
# Example Usage
now = round(time.time() * 1000) # Current time in milliseconds
body = request.body # Assuming this is bytes
headers = request.headers
err = verify_signature_v2(
headers.get("Epistula-Request-Signature"),
body,
headers.get("Epistula-Timestamp"),
headers.get("Epistula-Uuid"),
headers.get("Epistula-Signed-For"),
headers.get("Epistula-Signed-By"),
now
)
if err:
raise HTTPException(status_code=400, detail=err)