Skip to content

Commit b040ab2

Browse files
committed
WIP
1 parent 66d27b5 commit b040ab2

File tree

2 files changed

+316
-45
lines changed

2 files changed

+316
-45
lines changed

quickwit/quickwit-ingest/src/ingest_v2/ingester.rs

Lines changed: 233 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
use std::collections::hash_map::Entry;
2121
use std::collections::{HashMap, HashSet};
2222
use std::fmt;
23-
use std::iter::once;
2423
use std::path::Path;
2524
use std::sync::{Arc, Weak};
2625
use std::time::Duration;
@@ -56,8 +55,9 @@ use tracing::{debug, error, info, warn};
5655
use super::fetch::FetchStreamTask;
5756
use super::metrics::INGEST_V2_METRICS;
5857
use super::models::IngesterShard;
59-
use super::mrecord::MRecord;
60-
use super::mrecordlog_utils::{check_enough_capacity, force_delete_queue};
58+
use super::mrecordlog_utils::{
59+
append_non_empty_doc_batch, check_enough_capacity, force_delete_queue, AppendDocBatchError,
60+
};
6161
use super::rate_meter::RateMeter;
6262
use super::replication::{
6363
ReplicationClient, ReplicationStreamTask, ReplicationStreamTaskHandle, ReplicationTask,
@@ -120,6 +120,10 @@ impl Ingester {
120120
replication_factor: usize,
121121
) -> IngestV2Result<Self> {
122122
let self_node_id: NodeId = cluster.self_node_id().into();
123+
info!(
124+
"opening write-ahead log located at `{}`",
125+
wal_dir_path.display()
126+
);
123127
let mrecordlog = MultiRecordLog::open_with_prefs(
124128
wal_dir_path,
125129
mrecordlog::SyncPolicy::OnDelay(Duration::from_secs(5)),
@@ -232,10 +236,12 @@ impl Ingester {
232236
num_closed_shards += 1;
233237
} else {
234238
// The queue is empty: delete it.
235-
force_delete_queue(&mut state_guard.mrecordlog, &queue_id)
236-
.await
237-
.expect("TODO: handle IO error");
238-
239+
if let Err(io_error) =
240+
force_delete_queue(&mut state_guard.mrecordlog, &queue_id).await
241+
{
242+
error!("failed to delete shard `{queue_id}`: {io_error}");
243+
continue;
244+
}
239245
num_deleted_shards += 1;
240246
}
241247
}
@@ -297,6 +303,7 @@ impl Ingester {
297303
.await?;
298304

299305
if let Err(error) = replication_client.init_replica(shard).await {
306+
// TODO: Remove dangling queue from the WAL.
300307
error!("failed to initialize replica shard: {error}",);
301308
return Err(IngestV2Error::Internal(format!(
302309
"failed to initialize replica shard: {error}"
@@ -395,6 +402,13 @@ impl Ingester {
395402
let mut persist_failures = Vec::new();
396403
let mut replicate_subrequests: HashMap<NodeId, Vec<ReplicateSubrequest>> = HashMap::new();
397404

405+
// Keep track of the shards that need to be closed following an IO error.
406+
let mut shards_to_close: HashSet<QueueId> = HashSet::new();
407+
408+
// Keep track of dangling shards, i.e., shards for which there is no longer a corresponding
409+
// queue in the WAL and should be deleted.
410+
let mut shards_to_delete: HashSet<QueueId> = HashSet::new();
411+
398412
let commit_type = persist_request.commit_type();
399413
let force_commit = commit_type == CommitTypeV2::Force;
400414
let leader_id: NodeId = persist_request.leader_id.into();
@@ -515,27 +529,43 @@ impl Ingester {
515529

516530
rate_meter.update(batch_num_bytes);
517531

518-
let current_position_inclusive: Position = if force_commit {
519-
let encoded_mrecords = doc_batch
520-
.docs()
521-
.map(|doc| MRecord::Doc(doc).encode())
522-
.chain(once(MRecord::Commit.encode()));
523-
state_guard
524-
.mrecordlog
525-
.append_records(&queue_id, None, encoded_mrecords)
526-
.await
527-
.expect("TODO") // TODO: Io error, close shard?
528-
} else {
529-
let encoded_mrecords = doc_batch.docs().map(|doc| MRecord::Doc(doc).encode());
530-
state_guard
531-
.mrecordlog
532-
.append_records(&queue_id, None, encoded_mrecords)
533-
.await
534-
.expect("TODO") // TODO: Io error, close shard?
535-
}
536-
.map(Position::offset)
537-
.expect("records should not be empty");
532+
let append_result = append_non_empty_doc_batch(
533+
&mut state_guard.mrecordlog,
534+
&queue_id,
535+
&doc_batch,
536+
force_commit,
537+
)
538+
.await;
538539

540+
let current_position_inclusive = match append_result {
541+
Ok(current_position_inclusive) => current_position_inclusive,
542+
Err(append_error) => {
543+
let reason = match &append_error {
544+
AppendDocBatchError::Io(io_error) => {
545+
error!("failed to persist records to shard `{queue_id}`: {io_error}");
546+
shards_to_close.insert(queue_id);
547+
PersistFailureReason::ShardClosed
548+
}
549+
AppendDocBatchError::QueueNotFound(_) => {
550+
error!(
551+
"failed to persist records to shard `{queue_id}`: WAL queue not \
552+
found"
553+
);
554+
shards_to_delete.insert(queue_id);
555+
PersistFailureReason::ShardNotFound
556+
}
557+
};
558+
let persist_failure = PersistFailure {
559+
subrequest_id: subrequest.subrequest_id,
560+
index_uid: subrequest.index_uid,
561+
source_id: subrequest.source_id,
562+
shard_id: subrequest.shard_id,
563+
reason: reason as i32,
564+
};
565+
persist_failures.push(persist_failure);
566+
continue;
567+
}
568+
};
539569
// It's more precise the compute the new usage from the current usage + the requested
540570
// capacity than from continuously summing up the requested capacities, which are
541571
// approximations.
@@ -583,6 +613,28 @@ impl Ingester {
583613
persist_successes.push(persist_success);
584614
}
585615
}
616+
if !shards_to_close.is_empty() {
617+
for queue_id in &shards_to_close {
618+
let shard = state_guard
619+
.shards
620+
.get_mut(queue_id)
621+
.expect("shard should exist");
622+
623+
shard.shard_state = ShardState::Closed;
624+
shard.notify_shard_status();
625+
}
626+
info!(
627+
"closed {} shard(s) following IO error(s)",
628+
shards_to_close.len()
629+
);
630+
}
631+
if !shards_to_delete.is_empty() {
632+
for queue_id in &shards_to_delete {
633+
state_guard.shards.remove(queue_id);
634+
state_guard.rate_trackers.remove(queue_id);
635+
}
636+
info!("deleted {} dangling shard(s)", shards_to_delete.len());
637+
}
586638
if replicate_subrequests.is_empty() {
587639
let leader_id = self.self_node_id.to_string();
588640
let persist_response = PersistResponse {
@@ -1042,29 +1094,28 @@ impl IngesterState {
10421094
shard.truncation_position_inclusive = truncate_up_to_position_inclusive;
10431095
}
10441096
Err(TruncateError::MissingQueue(_)) => {
1045-
warn!("failed to truncate WAL queue `{queue_id}`: queue does not exist");
1097+
error!("failed to truncate shard `{queue_id}`: WAL queue not found");
1098+
self.shards.remove(queue_id);
1099+
self.rate_trackers.remove(queue_id);
1100+
info!("deleted dangling shard `{queue_id}`");
10461101
}
1047-
Err(error) => {
1048-
error!(%error, "failed to truncate WAL queue `{queue_id}`");
1102+
Err(TruncateError::IoError(io_error)) => {
1103+
error!("failed to truncate shard `{queue_id}`: {io_error}");
10491104
}
10501105
};
10511106
}
10521107

10531108
/// Deletes the shard identified by `queue_id` from the ingester state. It removes the
1054-
/// mrecordlog queue first and then, if the operation is successful, removes the shard.
1109+
/// mrecordlog queue first and then removes the associated in-memory shard and rate trackers.
10551110
async fn delete_shard(&mut self, queue_id: &QueueId) {
10561111
match self.mrecordlog.delete_queue(queue_id).await {
1057-
Ok(_) => {
1112+
Ok(_) | Err(DeleteQueueError::MissingQueue(_)) => {
10581113
self.shards.remove(queue_id);
10591114
self.rate_trackers.remove(queue_id);
1060-
1061-
info!("deleted shard `{queue_id}` from ingester");
1115+
info!("deleted shard `{queue_id}`");
10621116
}
1063-
Err(DeleteQueueError::MissingQueue(_)) => {
1064-
// The shard has already been deleted.
1065-
}
1066-
Err(DeleteQueueError::IoError(_)) => {
1067-
panic!("TODO: handle IO error")
1117+
Err(DeleteQueueError::IoError(io_error)) => {
1118+
error!("failed to delete shard `{queue_id}`: {io_error}");
10681119
}
10691120
};
10701121
}
@@ -1152,6 +1203,7 @@ mod tests {
11521203
use crate::ingest_v2::broadcast::ShardInfos;
11531204
use crate::ingest_v2::fetch::tests::{into_fetch_eof, into_fetch_payload};
11541205
use crate::ingest_v2::test_utils::MultiRecordLogTestExt;
1206+
use crate::MRecord;
11551207

11561208
pub(super) struct IngesterForTest {
11571209
node_id: NodeId,
@@ -1501,6 +1553,109 @@ mod tests {
15011553
);
15021554
}
15031555

1556+
#[tokio::test]
1557+
async fn test_ingester_persist_empty() {
1558+
let (ingester_ctx, mut ingester) = IngesterForTest::default().build().await;
1559+
1560+
let persist_request = PersistRequest {
1561+
leader_id: ingester_ctx.node_id.to_string(),
1562+
commit_type: CommitTypeV2::Force as i32,
1563+
subrequests: Vec::new(),
1564+
};
1565+
let persist_response = ingester.persist(persist_request).await.unwrap();
1566+
assert_eq!(persist_response.leader_id, "test-ingester");
1567+
assert_eq!(persist_response.successes.len(), 0);
1568+
assert_eq!(persist_response.failures.len(), 0);
1569+
1570+
let persist_request = PersistRequest {
1571+
leader_id: "test-ingester".to_string(),
1572+
commit_type: CommitTypeV2::Force as i32,
1573+
subrequests: vec![PersistSubrequest {
1574+
subrequest_id: 0,
1575+
index_uid: "test-index:0".to_string(),
1576+
source_id: "test-source".to_string(),
1577+
shard_id: 1,
1578+
doc_batch: None,
1579+
}],
1580+
};
1581+
1582+
let init_shards_request = InitShardsRequest {
1583+
shards: vec![Shard {
1584+
index_uid: "test-index:0".to_string(),
1585+
source_id: "test-source".to_string(),
1586+
shard_id: 1,
1587+
shard_state: ShardState::Open as i32,
1588+
leader_id: ingester_ctx.node_id.to_string(),
1589+
..Default::default()
1590+
}],
1591+
};
1592+
ingester.init_shards(init_shards_request).await.unwrap();
1593+
1594+
let persist_response = ingester.persist(persist_request).await.unwrap();
1595+
assert_eq!(persist_response.leader_id, "test-ingester");
1596+
assert_eq!(persist_response.successes.len(), 1);
1597+
assert_eq!(persist_response.failures.len(), 0);
1598+
1599+
let persist_success = &persist_response.successes[0];
1600+
assert_eq!(persist_success.subrequest_id, 0);
1601+
assert_eq!(persist_success.index_uid, "test-index:0");
1602+
assert_eq!(persist_success.source_id, "test-source");
1603+
assert_eq!(persist_success.shard_id, 1);
1604+
assert_eq!(
1605+
persist_success.replication_position_inclusive,
1606+
Some(Position::Beginning)
1607+
);
1608+
}
1609+
1610+
#[tokio::test]
1611+
async fn test_ingester_persist_deletes_dangling_shard() {
1612+
let (_ingester_ctx, mut ingester) = IngesterForTest::default().build().await;
1613+
1614+
let mut state_guard = ingester.state.write().await;
1615+
let queue_id = queue_id("test-index:0", "test-source", 1);
1616+
let solo_shard =
1617+
IngesterShard::new_solo(ShardState::Open, Position::Beginning, Position::Beginning);
1618+
state_guard.shards.insert(queue_id.clone(), solo_shard);
1619+
1620+
let rate_limiter = RateLimiter::from_settings(RateLimiterSettings::default());
1621+
let rate_meter = RateMeter::default();
1622+
state_guard
1623+
.rate_trackers
1624+
.insert(queue_id.clone(), (rate_limiter, rate_meter));
1625+
1626+
drop(state_guard);
1627+
1628+
let persist_request = PersistRequest {
1629+
leader_id: "test-ingester".to_string(),
1630+
commit_type: CommitTypeV2::Force as i32,
1631+
subrequests: vec![PersistSubrequest {
1632+
subrequest_id: 0,
1633+
index_uid: "test-index:0".to_string(),
1634+
source_id: "test-source".to_string(),
1635+
shard_id: 1,
1636+
doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])),
1637+
}],
1638+
};
1639+
let persist_response = ingester.persist(persist_request).await.unwrap();
1640+
assert_eq!(persist_response.leader_id, "test-ingester");
1641+
assert_eq!(persist_response.successes.len(), 0);
1642+
assert_eq!(persist_response.failures.len(), 1);
1643+
1644+
let persist_failure = &persist_response.failures[0];
1645+
assert_eq!(persist_failure.subrequest_id, 0);
1646+
assert_eq!(persist_failure.index_uid, "test-index:0");
1647+
assert_eq!(persist_failure.source_id, "test-source");
1648+
assert_eq!(persist_failure.shard_id, 1);
1649+
assert_eq!(
1650+
persist_failure.reason(),
1651+
PersistFailureReason::ShardNotFound
1652+
);
1653+
1654+
let state_guard = ingester.state.read().await;
1655+
assert_eq!(state_guard.shards.len(), 0);
1656+
assert_eq!(state_guard.rate_trackers.len(), 0);
1657+
}
1658+
15041659
#[tokio::test]
15051660
async fn test_ingester_persist_replicate() {
15061661
let (leader_ctx, mut leader) = IngesterForTest::default()
@@ -2144,7 +2299,7 @@ mod tests {
21442299
}
21452300

21462301
#[tokio::test]
2147-
async fn test_ingester_truncate() {
2302+
async fn test_ingester_truncate_shards() {
21482303
let (ingester_ctx, mut ingester) = IngesterForTest::default().build().await;
21492304

21502305
let shard_01 = Shard {
@@ -2245,6 +2400,44 @@ mod tests {
22452400
assert!(!state_guard.mrecordlog.queue_exists(&queue_id_02));
22462401
}
22472402

2403+
#[tokio::test]
2404+
async fn test_ingester_truncate_shards_deletes_dangling_shards() {
2405+
let (ingester_ctx, mut ingester) = IngesterForTest::default().build().await;
2406+
2407+
let queue_id = queue_id("test-index:0", "test-source", 1);
2408+
2409+
let mut state_guard = ingester.state.write().await;
2410+
let solo_shard =
2411+
IngesterShard::new_solo(ShardState::Open, Position::Beginning, Position::Beginning);
2412+
state_guard.shards.insert(queue_id.clone(), solo_shard);
2413+
2414+
let rate_limiter = RateLimiter::from_settings(RateLimiterSettings::default());
2415+
let rate_meter = RateMeter::default();
2416+
state_guard
2417+
.rate_trackers
2418+
.insert(queue_id.clone(), (rate_limiter, rate_meter));
2419+
2420+
drop(state_guard);
2421+
2422+
let truncate_shards_request = TruncateShardsRequest {
2423+
ingester_id: ingester_ctx.node_id.to_string(),
2424+
subrequests: vec![TruncateShardsSubrequest {
2425+
index_uid: "test-index:0".to_string(),
2426+
source_id: "test-source".to_string(),
2427+
shard_id: 1,
2428+
truncate_up_to_position_inclusive: Some(Position::offset(0u64)),
2429+
}],
2430+
};
2431+
ingester
2432+
.truncate_shards(truncate_shards_request.clone())
2433+
.await
2434+
.unwrap();
2435+
2436+
let state_guard = ingester.state.read().await;
2437+
assert_eq!(state_guard.shards.len(), 0);
2438+
assert_eq!(state_guard.rate_trackers.len(), 0);
2439+
}
2440+
22482441
#[tokio::test]
22492442
async fn test_ingester_retain_shards() {
22502443
let (_ingester_ctx, mut ingester) = IngesterForTest::default().build().await;

0 commit comments

Comments
 (0)