From 1ed3d30347b3ed391698a83a591eb2aaa981e895 Mon Sep 17 00:00:00 2001 From: Alejandro Ponce Date: Fri, 21 Feb 2025 15:03:43 +0200 Subject: [PATCH] FIM related fixes for alerts and DB There are a couple of fixes for this PR: 1. Return conversations with empty answers in the special case of FIM. Sometimes FIM doesn't give us an answer 2. Use the function to deduplicate alerts for all type of alerts 3. Fix a SQL query in which there were some messages being filtered out 4. Record FIM interactions in DB. They were being skipped. --- src/codegate/api/v1_processing.py | 16 +++++++++------- src/codegate/db/connection.py | 3 +-- src/codegate/providers/base.py | 20 +++++++++++++++++++- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/src/codegate/api/v1_processing.py b/src/codegate/api/v1_processing.py index 6606a882..cb2c0c9b 100644 --- a/src/codegate/api/v1_processing.py +++ b/src/codegate/api/v1_processing.py @@ -16,6 +16,7 @@ PartialQuestionAnswer, PartialQuestions, QuestionAnswer, + QuestionType, TokenUsageAggregate, TokenUsageByModel, ) @@ -384,8 +385,13 @@ async def match_conversations( selected_partial_qa = partial_qa break - # check if we have a question and answer, otherwise do not add it - if selected_partial_qa and selected_partial_qa.answer is not None: + # check if we have a question and answer, otherwise do not add it + # if the question is a FIM question, we should add it even if there is no answer + # not add Chat questions without answers + if selected_partial_qa and ( + selected_partial_qa.answer is not None + or selected_partial_qa.partial_questions.type == QuestionType.fim + ): # if we don't have a first question, set it. We will use it # to set the conversation timestamp and provider first_partial_qa = first_partial_qa or selected_partial_qa @@ -396,7 +402,7 @@ async def match_conversations( alerts.extend(deduped_alerts) token_usage_agg.add_model_token_usage(selected_partial_qa.model_token_usage) - # only add conversation if we have some answers + # if we have a conversation with at least one question and answer if len(questions_answers) > 0 and first_partial_qa is not None: if token_usage_agg.token_usage.input_tokens == 0: token_usage_agg = None @@ -435,7 +441,6 @@ async def parse_messages_in_conversations( Get all the messages from the database and return them as a list of conversations. """ partial_question_answers = await _process_prompt_output_to_partial_qa(prompts_outputs) - conversations, map_q_id_to_conversation = await match_conversations(partial_question_answers) return conversations, map_q_id_to_conversation @@ -510,9 +515,6 @@ async def remove_duplicate_alerts(alerts: List[v1_models.Alert]) -> List[v1_mode for alert in sorted( alerts, key=lambda x: x.timestamp, reverse=True ): # Sort alerts by timestamp descending - if alert.trigger_type != "codegate-secrets": - unique_alerts.append(alert) - continue # Extract trigger string content until "Context" trigger_string_content = alert.trigger_string.split("Context")[0] diff --git a/src/codegate/db/connection.py b/src/codegate/db/connection.py index 78a2d607..2d56fccd 100644 --- a/src/codegate/db/connection.py +++ b/src/codegate/db/connection.py @@ -610,7 +610,7 @@ async def get_prompts_with_output_alerts_usage_by_workspace_id( LEFT JOIN outputs o ON p.id = o.prompt_id LEFT JOIN alerts a ON p.id = a.prompt_id WHERE p.workspace_id = :workspace_id - AND a.trigger_category LIKE :trigger_category + AND (a.trigger_category = :trigger_category OR a.trigger_category is NULL) ORDER BY o.timestamp DESC, a.timestamp DESC """ # noqa: E501 ) @@ -622,7 +622,6 @@ async def get_prompts_with_output_alerts_usage_by_workspace_id( IntermediatePromptWithOutputUsageAlerts, sql, conditions, should_raise=True ) ) - prompts_dict: Dict[str, GetPromptWithOutputsRow] = {} for row in rows: prompt_id = row.prompt_id diff --git a/src/codegate/providers/base.py b/src/codegate/providers/base.py index d22afcc0..452fe08b 100644 --- a/src/codegate/providers/base.py +++ b/src/codegate/providers/base.py @@ -96,6 +96,24 @@ def _get_base_url(self) -> str: config = Config.get_config() return config.provider_urls.get(self.provider_route_name) if config else "" + async def process_stream_no_pipeline( + self, stream: AsyncIterator[ModelResponse], context: PipelineContext + ) -> AsyncIterator[ModelResponse]: + """ + Process a stream when there is no pipeline. + This is needed to record the output stream chunks for FIM. + """ + try: + async for chunk in stream: + context.add_output(chunk) + yield chunk + except Exception as e: + # Log exception and stop processing + logger.error(f"Error processing stream: {e}") + raise e + finally: + await self._db_recorder.record_context(context) + async def _run_output_stream_pipeline( self, input_context: PipelineContext, @@ -121,7 +139,7 @@ async def _run_output_stream_pipeline( and self.provider_route_name != "anthropic" ): logger.info("No output pipeline steps configured, passing through") - return model_stream + return self.process_stream_no_pipeline(model_stream, input_context) normalized_stream = self._output_normalizer.normalize_streaming(model_stream)