Skip to content

Commit 73f85c5

Browse files
committed
WIP
1 parent a65997a commit 73f85c5

File tree

8 files changed

+239
-114
lines changed

8 files changed

+239
-114
lines changed

quickwit/quickwit-ingest/src/ingest_v2/ingester.rs

Lines changed: 107 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
use std::collections::hash_map::Entry;
2121
use std::collections::HashMap;
2222
use std::fmt;
23-
use std::iter::once;
2423
use std::path::Path;
2524
use std::sync::Arc;
2625
use std::time::Duration;
@@ -49,8 +48,9 @@ use tracing::{error, info, warn};
4948

5049
use super::fetch::FetchStreamTask;
5150
use super::models::IngesterShard;
52-
use super::mrecord::MRecord;
53-
use super::mrecordlog_utils::{append_eof_record_if_necessary, check_enough_capacity};
51+
use super::mrecordlog_utils::{
52+
append_doc_batch, append_eof_record_if_necessary, check_enough_capacity, AppendDocBatchError,
53+
};
5454
use super::rate_limiter::{RateLimiter, RateLimiterSettings};
5555
use super::replication::{
5656
ReplicationStreamTask, ReplicationStreamTaskHandle, ReplicationTask, ReplicationTaskHandle,
@@ -101,6 +101,49 @@ pub(super) struct IngesterState {
101101
pub observation_tx: watch::Sender<IngestV2Result<ObservationMessage>>,
102102
}
103103

104+
impl IngesterState {
105+
async fn close_shards(&mut self, queue_ids: &[QueueId]) {
106+
for queue_id in queue_ids {
107+
append_eof_record_if_necessary(&mut self.mrecordlog, queue_id).await;
108+
109+
if let Some(shard) = self.shards.get_mut(queue_id) {
110+
shard.shard_state = ShardState::Closed;
111+
shard.notify_new_records();
112+
}
113+
}
114+
// TODO: Handle replicated shards.
115+
}
116+
117+
pub async fn decommission(&mut self) {
118+
let queue_ids: Vec<QueueId> = self.shards.keys().cloned().collect();
119+
self.close_shards(&queue_ids).await;
120+
121+
self.status = IngesterStatus::Decommissioning;
122+
self.check_decommissioning_status();
123+
}
124+
125+
/// Checks whether the ingester is fully decommissioned and updates its status accordingly.
126+
fn check_decommissioning_status(&mut self) {
127+
if self.status != IngesterStatus::Decommissioning {
128+
return;
129+
}
130+
if self.shards.values().all(|shard| {
131+
shard.shard_state.is_closed() && shard.truncation_position_inclusive == Position::Eof
132+
}) {
133+
info!("ingester fully decommissioned");
134+
self.status = IngesterStatus::Decommissioned;
135+
136+
self.observation_tx.send_if_modified(|observation_result| {
137+
if let Ok(observation) = observation_result {
138+
observation.status = IngesterStatus::Decommissioned as i32;
139+
return true;
140+
}
141+
false
142+
});
143+
}
144+
}
145+
}
146+
104147
impl Ingester {
105148
pub async fn try_new(
106149
self_node_id: NodeId,
@@ -158,27 +201,6 @@ impl Ingester {
158201
Ok(ingester)
159202
}
160203

161-
/// Checks whether the ingester is fully decommissioned and updates its status accordingly.
162-
fn check_decommissioning_status(&self, state: &mut IngesterState) {
163-
if state.status != IngesterStatus::Decommissioning {
164-
return;
165-
}
166-
if state.shards.values().all(|shard| {
167-
shard.shard_state.is_closed() && shard.truncation_position_inclusive == Position::Eof
168-
}) {
169-
info!("ingester fully decommissioned");
170-
state.status = IngesterStatus::Decommissioned;
171-
172-
state.observation_tx.send_if_modified(|observation_result| {
173-
if let Ok(observation) = observation_result {
174-
observation.status = IngesterStatus::Decommissioned as i32;
175-
return true;
176-
}
177-
false
178-
});
179-
}
180-
}
181-
182204
async fn init(&self) -> IngestV2Result<()> {
183205
let mut state_guard = self.state.write().await;
184206

@@ -246,18 +268,6 @@ impl Ingester {
246268
Ok(entry.or_insert(shard))
247269
}
248270

249-
async fn close_shards_inner(&self, state: &mut IngesterState, queue_ids: &[QueueId]) {
250-
for queue_id in queue_ids {
251-
append_eof_record_if_necessary(&mut state.mrecordlog, queue_id).await;
252-
253-
if let Some(shard) = state.shards.get_mut(queue_id) {
254-
shard.shard_state = ShardState::Closed;
255-
shard.notify_new_records();
256-
}
257-
}
258-
// TODO: Handle replicated shards.
259-
}
260-
261271
async fn init_replication_stream(
262272
&self,
263273
state: &mut IngesterState,
@@ -322,6 +332,7 @@ impl IngesterService for Ingester {
322332
let mut persist_successes = Vec::with_capacity(persist_request.subrequests.len());
323333
let mut persist_failures = Vec::new();
324334
let mut replicate_subrequests: HashMap<NodeId, Vec<ReplicateSubrequest>> = HashMap::new();
335+
let mut should_decommission = false;
325336

326337
let commit_type = persist_request.commit_type();
327338
let force_commit = commit_type == CommitTypeV2::Force;
@@ -350,6 +361,17 @@ impl IngesterService for Ingester {
350361
return Ok(persist_response);
351362
}
352363
for subrequest in persist_request.subrequests {
364+
if should_decommission {
365+
let persist_failure = PersistFailure {
366+
subrequest_id: subrequest.subrequest_id,
367+
index_uid: subrequest.index_uid,
368+
source_id: subrequest.source_id,
369+
shard_id: subrequest.shard_id,
370+
reason: PersistFailureReason::ShardClosed as i32,
371+
};
372+
persist_failures.push(persist_failure);
373+
continue;
374+
}
353375
let queue_id = subrequest.queue_id();
354376
let follower_id_opt: Option<NodeId> = subrequest.follower_id.map(Into::into);
355377
let shard = if let Some(shard) = state_guard.shards.get_mut(&queue_id) {
@@ -435,25 +457,41 @@ impl IngesterService for Ingester {
435457
persist_failures.push(persist_failure);
436458
continue;
437459
}
438-
let current_position_inclusive: Position = if force_commit {
439-
let encoded_mrecords = doc_batch
440-
.docs()
441-
.map(|doc| MRecord::Doc(doc).encode())
442-
.chain(once(MRecord::Commit.encode()));
443-
state_guard
444-
.mrecordlog
445-
.append_records(&queue_id, None, encoded_mrecords)
446-
.await
447-
.expect("TODO") // TODO: Io error, close shard?
448-
} else {
449-
let encoded_mrecords = doc_batch.docs().map(|doc| MRecord::Doc(doc).encode());
450-
state_guard
451-
.mrecordlog
452-
.append_records(&queue_id, None, encoded_mrecords)
453-
.await
454-
.expect("TODO") // TODO: Io error, close shard?
455-
}
456-
.into();
460+
let append_result = append_doc_batch(
461+
&mut state_guard.mrecordlog,
462+
&queue_id,
463+
&doc_batch,
464+
force_commit,
465+
)
466+
.await;
467+
468+
let current_position_inclusive = match append_result {
469+
Ok(current_position_inclusive) => current_position_inclusive,
470+
Err(append_error) => {
471+
let reason = match &append_error {
472+
AppendDocBatchError::Io(_) => {
473+
error!(
474+
"failed to append records to shard `{queue_id}`: {append_error}"
475+
);
476+
should_decommission = true;
477+
PersistFailureReason::ShardClosed
478+
}
479+
AppendDocBatchError::QueueNotFound(_) => {
480+
warn!("{append_error}");
481+
PersistFailureReason::ShardNotFound
482+
}
483+
};
484+
let persist_failure = PersistFailure {
485+
subrequest_id: subrequest.subrequest_id,
486+
index_uid: subrequest.index_uid,
487+
source_id: subrequest.source_id,
488+
shard_id: subrequest.shard_id,
489+
reason: reason as i32,
490+
};
491+
persist_failures.push(persist_failure);
492+
continue;
493+
}
494+
};
457495
let batch_num_bytes = doc_batch.num_bytes() as u64;
458496
let batch_num_docs = doc_batch.num_docs() as u64;
459497

@@ -549,6 +587,7 @@ impl IngesterService for Ingester {
549587
// already.
550588
let persist_failure_reason = match replicate_failure.reason() {
551589
ReplicateFailureReason::Unspecified => PersistFailureReason::Unspecified,
590+
ReplicateFailureReason::ShardNotFound => PersistFailureReason::ShardNotFound,
552591
ReplicateFailureReason::ShardClosed => PersistFailureReason::ShardClosed,
553592
ReplicateFailureReason::ResourceExhausted => {
554593
PersistFailureReason::ResourceExhausted
@@ -564,7 +603,13 @@ impl IngesterService for Ingester {
564603
persist_failures.push(persist_failure);
565604
}
566605
}
606+
if should_decommission {
607+
error!("decommissioning ingester after IO error");
608+
let mut state_guard = self.state.write().await;
609+
state_guard.decommission().await;
610+
}
567611
let leader_id = self.self_node_id.to_string();
612+
568613
let persist_response = PersistResponse {
569614
leader_id,
570615
successes: persist_successes,
@@ -683,7 +728,7 @@ impl IngesterService for Ingester {
683728
.flat_map(|shards| shards.queue_ids())
684729
.collect();
685730

686-
self.close_shards_inner(&mut state_guard, &queue_ids).await;
731+
state_guard.close_shards(&queue_ids).await;
687732

688733
Ok(CloseShardsResponse {})
689734
}
@@ -733,7 +778,7 @@ impl IngesterService for Ingester {
733778
}
734779
}
735780
}
736-
self.check_decommissioning_status(&mut state_guard);
781+
state_guard.check_decommissioning_status();
737782
let truncate_response = TruncateShardsResponse {};
738783
Ok(truncate_response)
739784
}
@@ -744,12 +789,7 @@ impl IngesterService for Ingester {
744789
) -> IngestV2Result<DecommissionResponse> {
745790
info!("decommissioning ingester");
746791
let mut state_guard = self.state.write().await;
747-
748-
let queue_ids: Vec<QueueId> = state_guard.shards.keys().cloned().collect();
749-
self.close_shards_inner(&mut state_guard, &queue_ids).await;
750-
751-
state_guard.status = IngesterStatus::Decommissioning;
752-
self.check_decommissioning_status(&mut state_guard);
792+
state_guard.decommission().await;
753793

754794
Ok(DecommissionResponse {})
755795
}
@@ -1836,15 +1876,15 @@ mod tests {
18361876
let (_ingester_ctx, ingester) = IngesterForTest::default().build().await;
18371877
let mut state_guard = ingester.state.write().await;
18381878

1839-
ingester.check_decommissioning_status(&mut state_guard);
1879+
state_guard.check_decommissioning_status();
18401880
assert_eq!(state_guard.status, IngesterStatus::Ready);
18411881
assert_eq!(
18421882
ingester.observation_rx.borrow().as_ref().unwrap().status(),
18431883
IngesterStatus::Ready
18441884
);
18451885

18461886
state_guard.status = IngesterStatus::Decommissioning;
1847-
ingester.check_decommissioning_status(&mut state_guard);
1887+
state_guard.check_decommissioning_status();
18481888
assert_eq!(state_guard.status, IngesterStatus::Decommissioned);
18491889

18501890
state_guard.status = IngesterStatus::Decommissioning;
@@ -1855,13 +1895,13 @@ mod tests {
18551895
queue_id_01.clone(),
18561896
IngesterShard::new_solo(ShardState::Closed, Position::Eof, Position::Beginning),
18571897
);
1858-
ingester.check_decommissioning_status(&mut state_guard);
1898+
state_guard.check_decommissioning_status();
18591899
assert_eq!(state_guard.status, IngesterStatus::Decommissioning);
18601900

18611901
let shard = state_guard.shards.get_mut(&queue_id_01).unwrap();
18621902
shard.truncation_position_inclusive = Position::Eof;
18631903

1864-
ingester.check_decommissioning_status(&mut state_guard);
1904+
state_guard.check_decommissioning_status();
18651905
assert_eq!(state_guard.status, IngesterStatus::Decommissioned);
18661906
assert_eq!(
18671907
ingester.observation_rx.borrow().as_ref().unwrap().status(),

quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,65 @@
1717
// You should have received a copy of the GNU Affero General Public License
1818
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1919

20+
use std::io;
21+
use std::iter::once;
22+
2023
use bytesize::ByteSize;
2124
use mrecordlog::error::{AppendError, MissingQueue};
2225
use mrecordlog::MultiRecordLog;
26+
use quickwit_proto::ingest::DocBatchV2;
2327
use quickwit_proto::types::{Position, QueueId};
2428
use tracing::warn;
2529

2630
use super::mrecord::is_eof_mrecord;
2731
use crate::MRecord;
2832

33+
#[derive(Debug, thiserror::Error)]
34+
pub(super) enum AppendDocBatchError {
35+
#[error("IO error: {0}")]
36+
Io(#[from] io::Error),
37+
#[error("WAL queue `{0}` not found")]
38+
QueueNotFound(QueueId),
39+
}
40+
41+
/// Appends a document batch to the WAL queue.
42+
///
43+
/// # Panics
44+
///
45+
/// Panics if `doc_batch` is empty.
46+
pub(super) async fn append_doc_batch(
47+
mrecordlog: &mut MultiRecordLog,
48+
queue_id: &QueueId,
49+
doc_batch: &DocBatchV2,
50+
force_commit: bool,
51+
) -> Result<Position, AppendDocBatchError> {
52+
let append_result = if force_commit {
53+
let encoded_mrecords = doc_batch
54+
.docs()
55+
.map(|doc| MRecord::Doc(doc).encode())
56+
.chain(once(MRecord::Commit.encode()));
57+
mrecordlog
58+
.append_records(&queue_id, None, encoded_mrecords)
59+
.await
60+
} else {
61+
let encoded_mrecords = doc_batch.docs().map(|doc| MRecord::Doc(doc).encode());
62+
mrecordlog
63+
.append_records(&queue_id, None, encoded_mrecords)
64+
.await
65+
};
66+
match append_result {
67+
Ok(Some(offset)) => Ok(Position::from(offset)),
68+
Ok(None) => panic!("`doc_batch` should not be empty"),
69+
Err(AppendError::IoError(io_error)) => Err(AppendDocBatchError::Io(io_error)),
70+
Err(AppendError::MissingQueue(queue_id)) => {
71+
Err(AppendDocBatchError::QueueNotFound(queue_id))
72+
}
73+
Err(AppendError::Past) => {
74+
panic!("`append_records` should be called with `None`")
75+
}
76+
}
77+
}
78+
2979
/// Appends an EOF record to the queue if it is empty or the last record is not an EOF
3080
/// record.
3181
pub(super) async fn append_eof_record_if_necessary(

0 commit comments

Comments
 (0)