Skip to content

Commit bfb7a62

Browse files
Enable delegate request executions in network mode (#1553)
Co-authored-by: Ignacio Duart <iduartgomez@gmail.com>
1 parent 42c9e10 commit bfb7a62

File tree

22 files changed

+1874
-141
lines changed

22 files changed

+1874
-141
lines changed

Cargo.lock

Lines changed: 67 additions & 75 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/freenet-ping/Cargo.lock

Lines changed: 57 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "freenet"
3-
version = "0.1.2"
3+
version = "0.1.3"
44
edition = "2021"
55
rust-version = "1.80"
66
publish = true

crates/core/src/client_events.rs

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ pub struct OpenRequest<'a> {
109109
pub request: Box<ClientRequest<'a>>,
110110
pub notification_channel: Option<UnboundedSender<HostResult>>,
111111
pub token: Option<AuthToken>,
112+
pub attested_contract: Option<ContractInstanceId>,
112113
}
113114

114115
impl Display for OpenRequest<'_> {
@@ -135,6 +136,7 @@ impl<'a> OpenRequest<'a> {
135136
request,
136137
notification_channel: None,
137138
token: None,
139+
attested_contract: None,
138140
}
139141
}
140142

@@ -147,6 +149,11 @@ impl<'a> OpenRequest<'a> {
147149
self.token = token;
148150
self
149151
}
152+
153+
pub fn with_attested_contract(mut self, contract: Option<ContractInstanceId>) -> Self {
154+
self.attested_contract = contract;
155+
self
156+
}
150157
}
151158

152159
pub trait ClientEventsProxy {
@@ -243,7 +250,13 @@ where
243250
contract,
244251
}))
245252
}
253+
QueryResult::DelegateResult { response, .. } => {
254+
response
255+
}
246256
};
257+
if let Ok(result) = &res {
258+
tracing::debug!(%result, "sending client operation response");
259+
}
247260
if let Err(err) = client_events.send(cli_id, res).await {
248261
tracing::debug!("channel closed: {err}");
249262
anyhow::bail!(err);
@@ -535,8 +548,48 @@ async fn process_open_request(
535548
}
536549
}
537550
}
538-
ClientRequest::DelegateOp(_op) => {
539-
todo!("FIXME: delegate op");
551+
ClientRequest::DelegateOp(req) => {
552+
tracing::debug!("Received delegate operation from user event");
553+
let delegate_key = req.key().clone();
554+
let attested_contract = request.attested_contract;
555+
556+
let res = match op_manager
557+
.notify_contract_handler(ContractHandlerEvent::DelegateRequest {
558+
req,
559+
attested_contract,
560+
})
561+
.await
562+
{
563+
Ok(ContractHandlerEvent::DelegateResponse(res)) => res,
564+
Err(err) => {
565+
tracing::error!("delegate operation failed: {}", err);
566+
return Err(Error::Contract(err));
567+
}
568+
Ok(_) => {
569+
tracing::error!("delegate operation failed: UnexpectedOpState");
570+
return Err(Error::Op(OpError::UnexpectedOpState));
571+
}
572+
};
573+
574+
let host_response = Ok(HostResponse::DelegateResponse {
575+
key: delegate_key.clone(),
576+
values: res,
577+
});
578+
579+
if let Some(ch) = &subscription_listener {
580+
if ch.send(host_response).is_err() {
581+
tracing::error!(
582+
"Failed to send delegate response through subscription channel"
583+
);
584+
}
585+
return Ok(None);
586+
}
587+
588+
// Return the response to be sent by client_event_handling
589+
return Ok(Some(Either::Left(QueryResult::DelegateResult {
590+
key: delegate_key,
591+
response: host_response,
592+
})));
540593
}
541594
ClientRequest::Disconnect { .. } => {
542595
unreachable!();
@@ -716,6 +769,7 @@ pub(crate) mod test {
716769
.into(),
717770
notification_channel: None,
718771
token: None,
772+
attested_contract: None,
719773
};
720774
return Ok(res.into_owned());
721775
} else if pk == self.key {
@@ -727,6 +781,7 @@ pub(crate) mod test {
727781
.into(),
728782
notification_channel: None,
729783
token: None,
784+
attested_contract: None,
730785
};
731786
return Ok(res.into_owned());
732787
}
@@ -819,6 +874,7 @@ pub(crate) mod test {
819874
.into(),
820875
notification_channel: None,
821876
token: None,
877+
attested_contract: None,
822878
};
823879
return Ok(res.into_owned());
824880
}

crates/core/src/client_events/combinator.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ impl<const N: usize> super::ClientEventsProxy for ClientEventsCombinator<N> {
7979
request,
8080
notification_channel,
8181
token,
82+
attested_contract,
8283
}) => {
8384
let id = *self.external_clients[idx]
8485
.entry(external)
@@ -94,6 +95,7 @@ impl<const N: usize> super::ClientEventsProxy for ClientEventsCombinator<N> {
9495
request,
9596
notification_channel,
9697
token,
98+
attested_contract
9799
})
98100
}
99101
err @ Err(_) => err,
@@ -153,9 +155,9 @@ async fn client_fn(
153155
}
154156
client_msg = client.recv() => {
155157
match client_msg {
156-
Ok(OpenRequest { client_id, request, notification_channel, token }) => {
158+
Ok(OpenRequest { client_id, request, notification_channel, token, attested_contract }) => {
157159
tracing::debug!("received msg @ combinator from external id {client_id}, msg: {request}");
158-
if tx_host.send(Ok(OpenRequest { client_id, request, notification_channel, token })).await.is_err() {
160+
if tx_host.send(Ok(OpenRequest { client_id, request, notification_channel, token, attested_contract })).await.is_err() {
159161
break;
160162
}
161163
}

crates/core/src/client_events/websocket.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ impl WebSocketProxy {
100100
client_id,
101101
req,
102102
auth_token,
103+
attested_contract,
103104
} => {
104105
let open_req = match &*req {
105106
ClientRequest::ContractOp(ContractRequest::Subscribe { key, .. }) => {
@@ -116,14 +117,17 @@ impl WebSocketProxy {
116117
OpenRequest::new(client_id, req)
117118
.with_notification(tx)
118119
.with_token(auth_token)
120+
.with_attested_contract(attested_contract)
119121
} else {
120122
tracing::warn!("client: {client_id} not found");
121123
return Err(ErrorKind::UnknownClient(client_id.into()).into());
122124
}
123125
}
124126
_ => {
125127
// just forward the request to the node
126-
OpenRequest::new(client_id, req).with_token(auth_token)
128+
OpenRequest::new(client_id, req)
129+
.with_token(auth_token)
130+
.with_attested_contract(attested_contract)
127131
}
128132
};
129133
Ok(Some(open_req))
@@ -332,6 +336,7 @@ async fn websocket_interface(
332336
next_msg,
333337
&request_sender,
334338
&mut auth_token.as_mut().map(|t| t.0.clone()),
339+
auth_token.as_mut().map(|t| t.1),
335340
encoding_protoc,
336341
)
337342
.await
@@ -416,6 +421,7 @@ async fn process_client_request(
416421
msg: Result<Message, axum::Error>,
417422
request_sender: &mpsc::Sender<ClientConnection>,
418423
auth_token: &mut Option<AuthToken>,
424+
attested_contract: Option<ContractInstanceId>,
419425
encoding_protoc: EncodingProtocol,
420426
) -> Result<Option<Message>, Option<anyhow::Error>> {
421427
let msg = match msg {
@@ -462,6 +468,7 @@ async fn process_client_request(
462468
client_id,
463469
req: Box::new(req),
464470
auth_token: auth_token.clone(),
471+
attested_contract,
465472
})
466473
.await
467474
.map_err(|err| Some(err.into()))?;

0 commit comments

Comments
 (0)