from Crypto.Hash import SHA256
from phe import paillier
from typing import List
from .utils import add, add_ec, multiply, rand, egcd, verify_dsa_signature, verify_ecdsa_signature
from .setup import DSASetup, ECDSASetup
from .network import Network, Client
[docs]class ThresholdSignature(Network):
clients: List[Client]
def __init__(self, N, C, setup=None, debug=False):
self.debug = debug
if setup is None:
self.dsa = DSASetup.generate_dsa_setup()
self.setup = DSASetup
super().__init__(N, self.dsa.q, self.dsa.h)
elif type(setup) == DSASetup:
self.dsa = setup
self.setup = DSASetup
super().__init__(N, self.dsa.q, self.dsa.h)
elif type(setup) == ECDSASetup:
self.ecdsa = setup.generate_ecdsa_setup()
self.setup = ECDSASetup
super().__init__(N, self.ecdsa.q, self.ecdsa.h)
else:
raise TypeError("Invalid type provided. "
"Please use either 'DSASetup' or 'ECDSASetup' types."
)
# Generate public and private keys for the paillier homomorphic encryption scheme
for i in range(C):
pub_key, priv_key = paillier.generate_paillier_keypair()
self.clients[i].he_private_key = priv_key
for node in self.nodes:
node.he_public_keys[i] = pub_key
for client in self.clients:
client.he_public_keys[i] = pub_key
[docs] def get_lambda(self, labels: list[str]) -> None:
"""
Emulates the generation of LAMBDA pairs :math:`([h^{\gamma}], [\gamma])` between all nodes.
Parameters:
labels (list[str]): A list of labels for which lambda values will be generated
and stored.
Returns:
None
"""
n = len(labels)
h = self.h
q = self.q
q_minus_one = q - 1
for l in range(n):
# Locally generate lambda
alpha = rand(q_minus_one)
h_alpha = pow(h, alpha, q)
self.share(alpha, q_minus_one, labels[l]+"_lambda_sh_exp")
self.share(h_alpha, q, labels[l]+"_lambda_sh_base")
[docs] def pow_share_protocol(self, base_type: str, get_label: str, save_label: str) -> None:
"""
Compute a power-sharing protocol among a group of nodes.
This function implements a one-round protocol to securely compute :math:`b^{s}` where
the exponent is a secret shared element between the nodes.
Parameters:
base_type (str): The type of base used: 'exp', when base to be used is self.h;
'base', when the base to be used is self.dsa.g. Note: 'base'
option can only be use for the DSA setup.
get_label (str): The label to retrieve shares of 's' from nodes.
save_label (str): The label to save the final result to.
Returns:
None
"""
if base_type not in ["exp", "base"]:
raise ValueError("{} is not one of the specified base types.\
Please choose one of the following:\n \
['exp', 'base']".format(base_type))
prime = self.q if base_type == "exp" else self.dsa.p
# Round 1
for node in self.nodes:
# DB management
exponent = node.get_share(get_label+"_sh_"+base_type)
# Step 1: compute base^share
if base_type == "exp":
h_exp = pow(self.h, exponent, prime)
else:
h_exp = pow(self.dsa.g, exponent, prime)
# Step 2: Broadcast base^share to nodes
self.broadcast(h_exp, "pow_share_node_"+str(node.id))
# All local
for node in self.nodes:
# DB management
base_exps = [
node.get_open("pow_share_node_"+str(other_node.id))
for other_node in self.nodes
]
# Step 3: multiply locally all powers of shares
val = multiply(base_exps, prime)
# DB management
node.set_open(val, save_label)
if not self.debug:
[node.delete_open("pow_share_node_"+str(other_node.id))
for other_node in self.nodes]
[docs] def ec_pow_share_protocol(self, get_label: str, save_label: str) -> None:
"""
Execute an elliptic curve (EC) version of power-sharing protocol.
This function implements a one-round protocol to securely compute
:math:`scalar\cdot G` where the scalar is a secret shared element between the nodes.
Parameters:
get_label (str): The label used to retrieve scalar shares from nodes.
save_label (str): The label used to save the result of the power-sharing protocol.
Returns:
None
"""
# Round 1
for node in self.nodes:
# DB management
scalar_sh = node.get_share(get_label+"_sh_base")
# Step 1:
sh_G = scalar_sh * self.ecdsa.G
# Step 2:
self.broadcast(sh_G, "ec_pow_share_node_"+str(node.id))
# All local
for node in self.nodes:
# DB management
base_exps = [
node.get_open("ec_pow_share_node_"+str(other_node.id))
for other_node in self.nodes
]
# Step 3: add locally all point shares
val = add_ec(base_exps)
# DB management
node.set_open(val, save_label)
if not self.debug:
[node.delete_open("ec_pow_share_node_"+str(other_node.id))
for other_node in self.nodes]
[docs] def subtract_exp_shares_local(self, label_a: str, label_b: str, label_r: str) -> None:
"""
Subtract the shares of the exponent of two labels and store the result in another label.
Parameters:
label_a (str): The label for the first operand.
label_b (str): The label for the second operand.
label_r (str): The label where the result is stored.
Returns:
None
"""
q_minus_one = self.q - 1
for node in self.nodes:
# DB management
share_a = node.get_share(label_a+"_sh_exp")
share_b = node.get_share(label_b+"_sh_exp")
# Local operation: subtraction
share_r = (share_a - share_b) % q_minus_one
# DB management
label = label_r+"_sh_exp"
node.set_share(share_r, label)
[docs] def pow_local(self, label_base: str, label_exponent: str, label_result: str) -> None:
"""
Compute the power of a base saved in open database raised to an exponent and store the result.
Parameters:
label_base (str): The label for the base.
label_exponent (str): The label for the exponent.
label_result (str): The label for the element where the result is stored.
Returns:
None
"""
for node in self.nodes:
# DB management
base = node.get_open(label_base)
exponent = node.get_open(label_exponent)
# Local operation: power
result = pow(base, exponent, self.dsa.p)
# DB management
node.set_open(result, label_result)
[docs] def key_agreement_protocol(self, label: str, delete=True) -> None:
"""
Perform a key agreement protocol to derive a mask of the secret key and the
corresponding public key.
Parameters:
label (str): The label of the pair associated with the secret key mask.
delete (bool, optional): Whether to delete intermediate data after the protocol.
Defaults to True.
Returns:
None
"""
q_minus_one = self.q - 1
# Round 1
# Step 1:
random_label = "random"
self.rss_protocol(q_minus_one, random_label)
# Round 2
# Step 2:
random_minus_label = random_label + "_minus_" + label
self.subtract_exp_shares_local(random_label, label + "_lambda", random_minus_label)
base_type_exp = "exp"
self.pow_share_protocol(base_type_exp, random_minus_label, label + "_sk")
if self.setup == DSASetup:
# Step 3:
base_type_base = "base"
self.pow_share_protocol(base_type_base, label + "_lambda", label + "_pre_pk")
# Step 4:
self.pow_local(label + "_pre_pk", label + "_sk", label + "_pk")
else:
# Step 3:
self.ec_pow_share_protocol(label + "_lambda", label + "_pre_pk")
# Step 4:
self.ec_mult_local(label + "_pre_pk", label + "_sk", label + "_pk")
# DB management
## Option only for testing purposes
if delete:
[node.delete_share(random_minus_label+"_sh_exp") for node in self.nodes]
[node.delete_share(random_label+"_sh_exp") for node in self.nodes]
[node.delete_open(label + "_pre_pk") for node in self.nodes]
[docs] def ec_mult_local(self, label_ec_point: str, label_scalar: str, label_result: str) -> None:
"""
Compute the multiplication of a scalar value with an elliptic point curve
and store the result.
Parameters:
label_ec_point (str): The label for the elliptic curve point.
label_scalar (str): The label for the scalar.
label_result (str): The label for the element where the result is stored.
Returns:
None
"""
for node in self.nodes:
# DB management
ec_point = node.get_open(label_ec_point)
scalar = node.get_open(label_scalar)
# Local operation: mult
result = scalar * ec_point
# DB management
node.set_open(result, label_result)
[docs] def encrypt_and_delete_exp_sh_local(self, label: str, client_id: int) -> None:
"""
Encrypt the share of the exponent element of the LAMBDA pair and delete the original
LAMBDA pair.
Parameters:
label (str): The label for LAMBDA pair.
client_id (int): Client id.
Returns:
None
"""
for node in self.nodes:
# DB management
clear_share = node.get_share(label+"_lambda_sh_exp")
# Local operation:
## Encrypt share
enc_sh_val = node.he_public_keys[client_id - 1].encrypt(clear_share)
## Delete lambda pair
node.delete_share(label+"_lambda_sh_exp")
node.delete_share(label+"_lambda_sh_base")
# DB management
sh_label = label+"_enc_sh_exp"
node.set_share(enc_sh_val, sh_label)
[docs] def send_public_key_to_client(self, label: str, client: Client) -> None:
"""
Nodes send public key to client.
Parameters:
label (str): The label for LAMBDA pair.
client_id (int): Client id.
Returns:
None
"""
all_y = [node.get_open(label+"_pk") for node in self.nodes]
# Check if all elements in the list are equal
are_all_equal = all(y == all_y[0] for y in all_y)
if are_all_equal:
client.set_open(all_y[0], label+"_pk")
else:
raise PublicKeyDisagreement("Abort.")
[docs] def distributed_key_generation_protocol(self, client_id: int, label=None) -> None:
"""
Execute a distributed key generation protocol for a specific client.
Parameters:
client_id (int): The unique identifier for the client.
label (str, optional): A custom label associated with the client. Defaults to None.
Returns:
None
"""
# Check there exist a client
client = next((client for client in self.clients if client.id == client_id), None)
if client == None:
raise TypeError(f"Client with id {client_id} is not part of the network.")
label = str(client_id)+"th_client_"+str(label) if label else str(client_id)+"th_client_"+"x"
delete = not self.debug
# Step 1
self.get_lambda([label])
# Step 2
self.key_agreement_protocol(label, delete=delete)
# Step 3
self.send_public_key_to_client(label, client)
# Step 4
self.encrypt_and_delete_exp_sh_local(label, client_id)
[docs] def compute_r_local(self, label: str, client: Client, delete=True) -> None:
"""
Compute r.
Parameters:
label (str): The label of the r element.
client (Client): A client.
Returns:
None
"""
for node in self.nodes:
# DB management
R = node.get_open(label + "_pk")
# Local operation
r = R % self.q if self.setup == DSASetup else int(R.x)
# DB management
node.set_open(r, label + "_r")
node.delete_open(label + "_pk")
client.set_open(r, label + "_r")
[docs] def invert_masked_factor_local(self, label) -> None:
"""
Invert a masked factor.
Parameters:
label (str): The label of the masked factor to be inverted.
Returns:
None
"""
for node in self.nodes:
# DB management
masked_factor = node.get_open(label+"_sk")
share = node.get_share(label+"_lambda_sh_exp")
# Local operation
## Invert masked factor
inv_masked_factor = egcd(masked_factor, self.q)
## Invert share
inv_share = -share % (self.q - 1)
# DB management
node.set_open(inv_masked_factor, label+"_inv_sk")
sh_inv_label = label+"_inv_lambda_sh_exp"
node.set_share(inv_share, sh_inv_label)
[docs] def step_4_encrypt_elements(
self,
label_lambda_1: str,
label_lambda_2: str,
labdel_lambda_k_inv: str,
save_label_m: str,
save_label_gap: str,
save_label_lambda_1: str,
save_label_lambda_2: str,
client_id: int
) -> None:
"""
Step 4 of the Threshold Signing protocol.
Parameters:
label (str): The label of the masked factor to be inverted.
label_lambda_1 (str): The label of lambda 1.
label_lambda_2 (str): The label of lambda 2.
labdel_lambda_k_inv (str): The label of :math:`k^{-1}`.
save_label_m (str): The label to save encrypted m.
save_label_gap (str): The label to save :math:`\lambda_{\text{gap}}`.
save_label_lambda_1 (str): The label to save lambda 1.
save_label_lambda_2 (str): The label to save lambda 2.
client_id: int
Returns:
None
"""
q_minus_one = self.q - 1
for node in self.nodes:
# DB management
sh_lambda_1_exp = node.get_share(label_lambda_1 +"_sh_exp")
sh_lambda_2_exp = node.get_share(label_lambda_2 +"_sh_exp")
sh_lambda_k_inv = node.get_share(labdel_lambda_k_inv +"_sh_exp")
sh_lambda_1_base = node.get_share(label_lambda_1 +"_sh_base")
sh_lambda_2_base = node.get_share(label_lambda_2 +"_sh_base")
enc_lambda_sk = node.get_share(str(client_id)+"th_client_x_enc_sh_exp")
# Local operation
## 4(a)
sh_m = (sh_lambda_1_exp - sh_lambda_k_inv) % q_minus_one
enc_sh_m = node.he_public_keys[client_id - 1].encrypt(sh_m)
## 4(b)
sh_int_gap = (sh_lambda_k_inv - sh_lambda_2_exp) % q_minus_one
enc_sh_int_gap = node.he_public_keys[client_id - 1].encrypt(sh_int_gap)
enc_sh_gap = enc_sh_int_gap + enc_lambda_sk
## 4(c)
enc_sh_lambda_1_base = node.he_public_keys[client_id - 1].encrypt(sh_lambda_1_base)
enc_sh_lambda_2_base= node.he_public_keys[client_id - 1].encrypt(sh_lambda_2_base)
# DB management
node.set_share(enc_sh_m, save_label_m+"_sh_exp")
node.set_share(enc_sh_gap, save_label_gap+"_sh_exp")
node.set_share(enc_sh_lambda_1_base, save_label_lambda_1+"_sh_base")
node.set_share(enc_sh_lambda_2_base, save_label_lambda_2+"_sh_base")
[docs] def delete_shares(self, list: List) -> None:
"""
Delete a set of shares.
Parameters:
list (List): List of shares to delete.
Returns:
None
"""
for node in self.nodes:
for element in list:
node.delete_share(element)
[docs] def decrypt_and_reconstruct_local(
self,
get_label: str,
save_label: str,
client: Client
) -> None:
"""
Decryption and reconstruction executed by the client.
Parameters:
get_label (str): The label of the shares to be dencrypted and reconstructed.
save_label (str): The label used to save the result.
client_id (int): The unique identifier for the client.
Returns:
None
"""
# DB management
enc_sh_per_node = [client.get_share(get_label+"_sh_exp_node_"+str(node.id)) for node in self.nodes]
# Local operation
## Decrypt
dec_sh_per_node = [client.he_private_key.decrypt(enc_sh) for enc_sh in enc_sh_per_node]
q_minus_one = self.q - 1
## Reconstruct and take the symmetric value
dec_val = add(dec_sh_per_node, q_minus_one)
# DB management
dec_label = save_label + "_exp"
client.set_share(dec_val, dec_label)
[client.delete_share(get_label+"_sh_exp_node_"+str(node.id)) for node in self.nodes] if not self.debug else None
[docs] def ts_prep_protocol(self, client_id):
"""
Execute the preprocessing phase of the threshold signature protocol for a specific client.
Parameters:
client_id (int): The unique identifier for the client.
Returns:
None
Raises:
TypeError: If the client with the provided 'client_id' is not part of the network.
KeyError: If the public key is not complete for the specified client.
"""
# Check there exist a client
client = next((client for client in self.clients if client.id == client_id), None)
if client == None:
raise TypeError(f"Client with id {client_id} is not part of the network.")
# Check there exist client public key triple (<x>, y, Enc([\lambda_x]))
try:
for node in self.nodes:
node.get_open(str(client_id)+"th_client_x_sk")
node.get_open(str(client_id)+"th_client_x_pk")
node.get_share(str(client_id)+"th_client_x_enc_sh_exp")
except KeyError:
print(f"Public key triple (<x>, y, Enc([\lambda_x])) from DKG is not complete for client {client_id}. Generate it first using 'distributed_key_generation_protocol({client_id})'")
# Signers preprocessing
# Step 1
label_k = str(client_id)+"th_client_k"
label_lambda_1 = str(client_id)+"th_client_lambda_1"
label_lambda_2 = str(client_id)+"th_client_lambda_2"
self.get_lambda([label_k, label_lambda_1, label_lambda_2])
# Step 2
self.key_agreement_protocol(label_k)
# Step 3(a): set r
self.compute_r_local(label_k, client)
# Step 3(b): invert k
self.invert_masked_factor_local(label_k)
# Step 4: encrypt
self.step_4_encrypt_elements(
label_lambda_1 + "_lambda",
label_lambda_2 + "_lambda",
label_k + "_inv_lambda",
str(client_id)+"th_client_m_lambda_enc",
str(client_id)+"th_client_gap_lambda_enc",
str(client_id)+"th_client_lambda_1_enc" ,
str(client_id)+"th_client_lambda_2_enc" ,
client_id)
# Step 5: delete
self.delete_shares([
str(client_id)+"th_client_k_lambda_sh_exp",
str(client_id)+"th_client_k_lambda_sh_base",
str(client_id)+"th_client_lambda_1_lambda_sh_exp",
str(client_id)+"th_client_lambda_1_lambda_sh_base",
str(client_id)+"th_client_lambda_2_lambda_sh_exp",
str(client_id)+"th_client_lambda_2_lambda_sh_base",
str(client_id)+"th_client_k_inv_lambda_sh_exp",
])
# Client preprocessing
# Step 6: send encryption
label_gap = "gap_lambda"
label_send_gap = str(client_id)+"th_client_"+ label_gap +"_enc"
label_m = "m_lambda"
label_send_m = str(client_id)+"th_client_"+ label_m +"_enc"
type_share = "exp"
self.send(type_share, label_send_gap, client, delete=True)
self.send(type_share, label_send_m, client, delete=True)
# Step 7: client decrypts and reconstructs
self.decrypt_and_reconstruct_local(label_send_gap, label_gap, client)
self.decrypt_and_reconstruct_local(label_send_m, label_m, client)
[docs] def broadcast_masked_message_digest(self, message: str, client: Client) -> None:
"""
Broadcasts a masked message digest to the client.
Parameters:
message (str): The input message to be hashed and masked.
client (Client): An instance of the client participating in the protocol.
Returns:
None
"""
# DB management
m_lambda_exp = client.get_share("m_lambda_exp")
gap_lambda_exp = client.get_share("gap_lambda_exp")
# Local operation
## Compute message
message_digest = SHA256.new(data=message.encode("utf-8"))
m = int(message_digest.hexdigest(), 16) % self.q
## Compute gap particle
minus_m_plus_gap = (-(m_lambda_exp + gap_lambda_exp)) % (self.q - 1)
gap_particle = (m * pow(self.h, minus_m_plus_gap, self.q)) % self.q
# Broadcast
self.broadcast(gap_particle, str(client.id)+"th_client_gap_particle_m")
[docs] def sign_local(self, client_id: int, delete=True):
"""
Sign a message locally and optionally delete intermediate shares.
Parameters:
client_id (int): The unique identifier of the client.
delete (bool, optional): A flag indicating whether to delete intermediate shares after signing (default is True).
Returns:
None
"""
q = self.q
for node in self.nodes:
# DB management
enc_sh_lambda_1 = node.get_share(str(client_id)+"th_client_lambda_1_enc_sh_base")
enc_sh_lambda_2 = node.get_share(str(client_id)+"th_client_lambda_2_enc_sh_base")
p_k_inv = node.get_open(str(client_id)+"th_client_k_inv_sk")
p_x = node.get_open(str(client_id)+"th_client_x_sk")
p_r = node.get_open(str(client_id)+"th_client_k_r")
p_gap_m = node.get_open(str(client_id)+"th_client_gap_particle_m")
# Local operation
scalar_k_m = (p_k_inv * p_gap_m) % q
scalar_k_r_x = (((p_k_inv * p_r) % q) * p_x) % q
enc_sh_s_gap = enc_sh_lambda_1 * scalar_k_m + enc_sh_lambda_2 * scalar_k_r_x
# DB management
node.set_share(enc_sh_s_gap, str(client_id)+"th_client_enc_signature_sh_base")
if delete:
node.delete_open(str(client_id)+"th_client_k_sk")
[docs] def reconstruct_and_verify_sig(self, message: str, get_label: str, client: Client, delete=True):
"""
Reconstructs and verifies a client's digital signature for a given message.
Parameters:
message (str): The input message for which the signature is to be reconstructed and verified.
get_label (str): The label used to retrieve the client's signature share from the database.
client (Client): An instance of the client for which the signature is reconstructed and verified.
delete (bool, optional): A flag indicating whether to delete intermediate shares after verification (default is True).
Returns:
None: This function doesn't return a value; it verifies the signature and potentially deletes intermediate shares.
"""
q = self.q
if self.setup == DSASetup:
p = self.dsa.p
g = self.dsa.g
else:
G = self.ecdsa.G
# DB management
gap_lambda_exp = client.get_share("gap_lambda_exp")
y = client.get_open(str(client.id)+"th_client_x_pk")
r = client.get_open(str(client.id)+"th_client_k_r")
s_h_gap = client.get_share(get_label)
# Compute signature
s = (s_h_gap * pow(self.h, gap_lambda_exp, self.q)) % self.q
# Verify signature
verify_dsa_signature(message, r, s, y, p, q, g) if self.setup == DSASetup else verify_ecdsa_signature(message, r, s, y, q, G)
# DB management
signature_label = str(client.id)+"th_client_s"
client.set_open(s, signature_label)
message_label = str(client.id)+"th_client_message"
client.set_open(message, message_label)
[docs] def decrypt_reconstruct_unmask_verify_sig_local(self, message: str, get_label: str, client: Client, delete=True):
"""
Reconstructs and verifies a client's digital signature for a given message.
Parameters:
message (str): The input message for which the signature is to be reconstructed and verified.
get_label (str): The label of the shares to be dencrypted and reconstructed.
client (Client): An instance of the client for which the signature is reconstructed and verified.
delete (bool, optional): A flag indicating whether to delete intermediate shares after verification (default is True).
Returns:
None: This function doesn't return a value; it verifies the signature and potentially deletes intermediate shares.
"""
q = self.q
if self.setup == DSASetup:
p = self.dsa.p
g = self.dsa.g
else:
G = self.ecdsa.G
# DB management
enc_sh_per_node = [client.get_share(str(client.id)+"th_client_"+get_label+"_sh_base_node_"+str(node.id)) for node in self.nodes]
gap_lambda_exp = client.get_share("gap_lambda_exp")
y = client.get_open(str(client.id)+"th_client_x_pk")
r = client.get_open(str(client.id)+"th_client_k_r")
# Local operation
## Decrypt
dec_sh_per_node = [client.he_private_key.decrypt(enc_sh) for enc_sh in enc_sh_per_node]
q_minus_one = self.q - 1
## Reconstruct
s_h_gap = add(dec_sh_per_node, q)
## Unmask
s = (s_h_gap * pow(self.h, gap_lambda_exp, q)) % q
# Verify signature
verify_dsa_signature(message, r, s, y, p, q, g) if self.setup == DSASetup else verify_ecdsa_signature(message, r, s, y, q, G)
# DB management
signature_label = str(client.id)+"th_client_s"
client.set_open(s, signature_label)
message_label = str(client.id)+"th_client_message"
client.set_open(message, message_label)
[docs] def ts_online_protocol(self, message: str, client_id: int) -> None:
"""
Executes the online phase of the threshold signature protocol for a specific client.
Parameters:
message (str): The message to be signed by the client.
client_id (int): The unique identifier of the client participating in the protocol.
Returns:
None
"""
# Check there exist a client
client = next((client for client in self.clients if client.id == client_id), None)
if client == None:
raise TypeError(f"Client with id {client_id} is not part of the network.")
# Check there 'ts_prep_protocol' was run
try:
for node in self.nodes:
node.get_open(str(client_id)+"th_client_k_inv_sk")
node.get_open(str(client_id)+"th_client_k_r")
client.get_share("gap_lambda_exp")
client.get_share("m_lambda_exp")
except KeyError:
print(f"The preprocessing phase was not run for client {client_id}.")
# Step 8: compute digest, mask it, include gap and broadcast the result to all nodes
self.broadcast_masked_message_digest(message, client)
# Step 9a: all nodes compute locally the shares corresponding to clients
delete = not self.debug
self.sign_local(client_id, delete=delete)
# Step 9b: send encryption
label_enc_sig = "enc_signature"
label_send_enc_sig = str(client_id)+"th_client_" + label_enc_sig
type_share = "base"
self.send(type_share, label_send_enc_sig, client, delete=True)
# Step 10: client decrypts, reconstructs, unmasks and verifies signature
self.decrypt_reconstruct_unmask_verify_sig_local(message, label_enc_sig, client)
[docs] def print_signature(self, client_id: int) -> None:
# Check there exist a client
client = next((client for client in self.clients if client.id == client_id), None)
if client == None:
raise TypeError(f"Client with id {client_id} is not part of the network.")
# Check there exist client public key triple (<x>, y, Enc([\lambda_x]))
try:
r = client.get_open(str(client.id)+"th_client_k_r")
s = client.get_open(str(client.id)+"th_client_s")
m = client.get_open(str(client.id)+"th_client_message")
except KeyError:
print(f"Signature not generated for client {client_id}.'")
print(f" Client(id={client_id},")
print(f" r={r},")
print(f" s={s},")
print(f" m={m},\n )")
[docs] def retrieve_signature(self, client_id: int) -> (int, int, str):
# Check there exist a client
client = next((client for client in self.clients if client.id == client_id), None)
if client == None:
raise TypeError(f"Client with id {client_id} is not part of the network.")
# Check there exist client public key triple (<x>, y, Enc([\lambda_x]))
try:
r = client.get_open(str(client.id)+"th_client_k_r")
s = client.get_open(str(client.id)+"th_client_s")
m = client.get_open(str(client.id)+"th_client_message")
except KeyError:
print(f"Signature not generated for client {client_id}.'")
return r, s, m
[docs]class PublicKeyDisagreement(Exception):
def __init__(self, message):
self.message = f"Public keys are not consistent. {message}"
super().__init__(self.message)