19
19
def upload_batch_inferences (
20
20
client : Openlayer ,
21
21
inference_pipeline_id : str ,
22
- dataset_df : pd .DataFrame ,
23
22
config : data_stream_params .Config ,
23
+ dataset_df : Optional [pd .DataFrame ] = None ,
24
+ dataset_path : Optional [str ] = None ,
24
25
storage_type : Optional [StorageType ] = None ,
25
26
merge : bool = False ,
26
27
) -> None :
27
28
"""Uploads a batch of inferences to the Openlayer platform."""
29
+ if dataset_df is None and dataset_path is None :
30
+ raise ValueError ("Either dataset_df or dataset_path must be provided." )
31
+ if dataset_df is not None and dataset_path is not None :
32
+ raise ValueError ("Only one of dataset_df or dataset_path should be provided." )
33
+
28
34
uploader = _upload .Uploader (client , storage_type )
29
35
object_name = f"batch_data_{ time .time ()} _{ inference_pipeline_id } .tar.gz"
30
36
@@ -35,8 +41,11 @@ def upload_batch_inferences(
35
41
36
42
# Write dataset and config to temp directory
37
43
with tempfile .TemporaryDirectory () as tmp_dir :
38
- temp_file_path = f"{ tmp_dir } /dataset.csv"
39
- dataset_df .to_csv (temp_file_path , index = False )
44
+ if dataset_df is not None :
45
+ temp_file_path = f"{ tmp_dir } /dataset.csv"
46
+ dataset_df .to_csv (temp_file_path , index = False )
47
+ else :
48
+ temp_file_path = dataset_path
40
49
41
50
# Copy relevant files to tmp dir
42
51
config ["label" ] = "production"
@@ -47,7 +56,11 @@ def upload_batch_inferences(
47
56
48
57
tar_file_path = os .path .join (tmp_dir , object_name )
49
58
with tarfile .open (tar_file_path , mode = "w:gz" ) as tar :
50
- tar .add (tmp_dir , arcname = os .path .basename ("monitoring_data" ))
59
+ tar .add (temp_file_path , arcname = os .path .basename ("dataset.csv" ))
60
+ tar .add (
61
+ f"{ tmp_dir } /dataset_config.yaml" ,
62
+ arcname = os .path .basename ("dataset_config.yaml" ),
63
+ )
51
64
52
65
# Upload to storage
53
66
uploader .upload (
0 commit comments