@@ -139,12 +139,16 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
139
139
}
140
140
SourceType :: IngestV2 => {
141
141
// Expect: the source should exist since we just read it from `get_source_configs`.
142
+ // Note that we keep all shards, including Closed shards:
143
+ // A closed shards still needs to be indexed.
142
144
let shard_ids: Vec < ShardId > = model
143
145
. list_shards_for_source ( & source_uid)
144
146
. expect ( "source should exist" )
145
147
. map ( |shard| shard. shard_id )
146
148
. collect ( ) ;
147
-
149
+ if shard_ids. is_empty ( ) {
150
+ continue ;
151
+ }
148
152
sources. push ( SourceToSchedule {
149
153
source_uid,
150
154
source_type : SourceToScheduleType :: Sharded {
@@ -695,7 +699,23 @@ mod tests {
695
699
max_num_pipelines_per_indexer : NonZeroUsize :: new ( 2 ) . unwrap ( ) ,
696
700
desired_num_pipelines : NonZeroUsize :: new ( 2 ) . unwrap ( ) ,
697
701
enabled : true ,
698
- // ingest v1
702
+ // ingest v2
703
+ source_params : SourceParams :: Ingest ,
704
+ transform_config : None ,
705
+ input_format : Default :: default ( ) ,
706
+ } ,
707
+ )
708
+ . unwrap ( ) ;
709
+ // ingest v2 without any open shard is skipped.
710
+ model
711
+ . add_source (
712
+ & index_uid,
713
+ SourceConfig {
714
+ source_id : "ingest_v2_without_shard" . to_string ( ) ,
715
+ max_num_pipelines_per_indexer : NonZeroUsize :: new ( 2 ) . unwrap ( ) ,
716
+ desired_num_pipelines : NonZeroUsize :: new ( 2 ) . unwrap ( ) ,
717
+ enabled : true ,
718
+ // ingest v2
699
719
source_params : SourceParams :: Ingest ,
700
720
transform_config : None ,
701
721
input_format : Default :: default ( ) ,
@@ -717,6 +737,14 @@ mod tests {
717
737
} ,
718
738
)
719
739
. unwrap ( ) ;
740
+ let shard = Shard {
741
+ index_uid : index_uid. to_string ( ) ,
742
+ source_id : "ingest_v2" . to_string ( ) ,
743
+ shard_id : 17 ,
744
+ shard_state : ShardState :: Open as i32 ,
745
+ ..Default :: default ( )
746
+ } ;
747
+ model. insert_newly_opened_shards ( & index_uid, & "ingest_v2" . to_string ( ) , vec ! [ shard] , 18 ) ;
720
748
let shards: Vec < SourceToSchedule > = get_sources_to_schedule ( & model) ;
721
749
assert_eq ! ( shards. len( ) , 3 ) ;
722
750
}
@@ -816,6 +844,7 @@ mod tests {
816
844
817
845
use quickwit_config:: SourceInputFormat ;
818
846
use quickwit_proto:: indexing:: mcpu;
847
+ use quickwit_proto:: ingest:: { Shard , ShardState } ;
819
848
820
849
fn kafka_source_params_for_test ( ) -> SourceParams {
821
850
SourceParams :: Kafka ( KafkaSourceParams {
0 commit comments