Skip to content

Feat: OLAP Table for CEL Eval Failures #2012

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
19 changes: 19 additions & 0 deletions api/v1/server/handlers/v1/cel/debug.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package celv1

import (
"fmt"

"github.com/hatchet-dev/hatchet/api/v1/server/oas/gen"
"github.com/hatchet-dev/hatchet/api/v1/server/oas/transformers/v1"
"github.com/hatchet-dev/hatchet/internal/cel"
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc"
"github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1"
"github.com/labstack/echo/v4"
)

func (c *V1CELService) V1CelDebug(ctx echo.Context, request gen.V1CelDebugRequestObject) (gen.V1CelDebugResponseObject, error) {
tenant := ctx.Get("tenant").(*dbsqlc.Tenant)

additionalMetadata := make(map[string]interface{})
if request.Body.AdditionalMetadata != nil {
additionalMetadata = *request.Body.AdditionalMetadata
Expand All @@ -25,6 +31,19 @@ func (c *V1CELService) V1CelDebug(ctx echo.Context, request gen.V1CelDebugReques
),
)

if err != nil {
ingestErr := c.config.Ingestor.IngestCELEvaluationFailure(
ctx.Request().Context(),
tenant.ID.String(),
err.Error(),
sqlcv1.V1CelEvaluationFailureSourceDEBUG,
)

if ingestErr != nil {
return nil, fmt.Errorf("failed to ingest CEL evaluation failure: %w", ingestErr)
}
}

var output *bool
var errorMessage *string

Expand Down
25 changes: 25 additions & 0 deletions cmd/hatchet-migrate/migrate/migrations/20250725183217_v1_0_30.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- +goose Up
-- +goose StatementBegin
CREATE TYPE v1_cel_evaluation_failure_source AS ENUM ('FILTER', 'WEBHOOK', 'DEBUG');

CREATE TABLE v1_cel_evaluation_failures_olap (
id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY,

tenant_id UUID NOT NULL,

source v1_cel_evaluation_failure_source NOT NULL,

error TEXT NOT NULL,

inserted_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,

PRIMARY KEY (inserted_at, id)
) PARTITION BY RANGE(inserted_at);
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
DROP TABLE v1_cel_evaluation_failures_olap;
DROP TYPE v1_cel_evaluation_failure_source;
-- +goose StatementEnd
21 changes: 21 additions & 0 deletions internal/services/controllers/v1/olap/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,11 +337,32 @@ func (tc *OLAPControllerImpl) handleBufferedMsgs(tenantId, msgId string, payload
return tc.handleCreateMonitoringEvent(context.Background(), tenantId, payloads)
case "created-event-trigger":
return tc.handleCreateEventTriggers(context.Background(), tenantId, payloads)
case "cel-evaluation-failure":
return tc.handleCelEvaluationFailure(context.Background(), tenantId, payloads)
}

return fmt.Errorf("unknown message id: %s", msgId)
}

func (tc *OLAPControllerImpl) handleCelEvaluationFailure(ctx context.Context, tenantId string, payloads [][]byte) error {
failures := make([]v1.CELEvaluationFailure, 0)

msgs := msgqueue.JSONConvert[tasktypes.CELEvaluationFailures](payloads)

for _, msg := range msgs {
for _, failure := range msg.Failures {
if !tc.sample(failure.ErrorMessage) {
tc.l.Debug().Msgf("skipping CEL evaluation failure %s for source %s", failure.ErrorMessage, failure.Source)
continue
}

failures = append(failures, failure)
}
}

return tc.repo.OLAP().StoreCELEvaluationFailures(ctx, tenantId, failures)
}

// handleCreatedTask is responsible for flushing a created task to the OLAP repository
func (tc *OLAPControllerImpl) handleCreatedTask(ctx context.Context, tenantId string, payloads [][]byte) error {
createTaskOpts := make([]*sqlcv1.V1Task, 0)
Expand Down
15 changes: 15 additions & 0 deletions internal/services/controllers/v1/task/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,21 @@ func (tc *TasksControllerImpl) handleProcessUserEventTrigger(ctx context.Context
return fmt.Errorf("could not trigger tasks from events: %w", err)
}

evalFailuresMsg, err := tasktypes.CELEvaluationFailureMessage(
tenantId,
result.CELEvaluationFailures,
)

if err != nil {
return fmt.Errorf("could not create CEL evaluation failure message: %w", err)
}

err = tc.pubBuffer.Pub(ctx, msgqueue.OLAP_QUEUE, evalFailuresMsg, false)

if err != nil {
return fmt.Errorf("could not deliver CEL evaluation failure message: %w", err)
}

eg := &errgroup.Group{}

eg.Go(func() error {
Expand Down
11 changes: 11 additions & 0 deletions internal/services/ingestor/ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc"
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1"
"github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1"
"github.com/hatchet-dev/hatchet/pkg/validator"
)

Expand All @@ -25,6 +26,7 @@ type Ingestor interface {
IngestEvent(ctx context.Context, tenant *dbsqlc.Tenant, eventName string, data []byte, metadata []byte, priority *int32, scope *string) (*dbsqlc.Event, error)
BulkIngestEvent(ctx context.Context, tenant *dbsqlc.Tenant, eventOpts []*repository.CreateEventOpts) ([]*dbsqlc.Event, error)
IngestReplayedEvent(ctx context.Context, tenant *dbsqlc.Tenant, replayedEvent *dbsqlc.Event) (*dbsqlc.Event, error)
IngestCELEvaluationFailure(ctx context.Context, tenantId, errorText string, source sqlcv1.V1CelEvaluationFailureSource) error
}

type IngestorOptFunc func(*IngestorOpts)
Expand Down Expand Up @@ -339,3 +341,12 @@ func eventToTask(e *dbsqlc.Event) *msgqueue.Message {
Retries: 3,
}
}

func (i *IngestorImpl) IngestCELEvaluationFailure(ctx context.Context, tenantId, errorText string, source sqlcv1.V1CelEvaluationFailureSource) error {
return i.ingestCELEvaluationFailure(
ctx,
tenantId,
errorText,
source,
)
}
26 changes: 26 additions & 0 deletions internal/services/ingestor/ingestor_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/hatchet-dev/hatchet/pkg/repository"
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc"
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
v1 "github.com/hatchet-dev/hatchet/pkg/repository/v1"
"github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1"
)

type EventResult struct {
Expand Down Expand Up @@ -156,3 +158,27 @@ func eventToTaskV1(tenantId, eventExternalId, key string, data, additionalMeta [
payloadTyped,
)
}

func (i *IngestorImpl) ingestCELEvaluationFailure(ctx context.Context, tenantId, errorText string, source sqlcv1.V1CelEvaluationFailureSource) error {
msg, err := tasktypes.CELEvaluationFailureMessage(
tenantId,
[]v1.CELEvaluationFailure{
{
Source: source,
ErrorMessage: errorText,
},
},
)

if err != nil {
return fmt.Errorf("failed to create CEL evaluation failure message: %w", err)
}

err = i.mqv1.SendMessage(ctx, msgqueue.OLAP_QUEUE, msg)

if err != nil {
return fmt.Errorf("failed to send CEL evaluation failure message: %w", err)
}

return nil
}
16 changes: 16 additions & 0 deletions internal/services/shared/tasktypes/v1/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,22 @@ import (
"github.com/hatchet-dev/hatchet/pkg/repository/v1/sqlcv1"
)

type CELEvaluationFailures struct {
Failures []v1.CELEvaluationFailure
}

func CELEvaluationFailureMessage(tenantId string, failures []v1.CELEvaluationFailure) (*msgqueue.Message, error) {
return msgqueue.NewTenantMessage(
tenantId,
"cel-evaluation-failure",
false,
true,
CELEvaluationFailures{
Failures: failures,
},
)
}

type CreatedTaskPayload struct {
*sqlcv1.V1Task
}
Expand Down
23 changes: 23 additions & 0 deletions pkg/repository/v1/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ type OLAPRepository interface {

GetDagDurationsByDagIds(ctx context.Context, tenantId string, dagIds []int64, dagInsertedAts []pgtype.Timestamptz, readableStatuses []sqlcv1.V1ReadableStatusOlap) (map[string]*sqlcv1.GetDagDurationsByDagIdsRow, error)
GetTaskDurationsByTaskIds(ctx context.Context, tenantId string, taskIds []int64, taskInsertedAts []pgtype.Timestamptz, readableStatuses []sqlcv1.V1ReadableStatusOlap) (map[int64]*sqlcv1.GetTaskDurationsByTaskIdsRow, error)

StoreCELEvaluationFailures(ctx context.Context, tenantId string, failures []CELEvaluationFailure) error
}

type OLAPRepositoryImpl struct {
Expand Down Expand Up @@ -1730,3 +1732,24 @@ func (r *OLAPRepositoryImpl) GetTaskDurationsByTaskIds(ctx context.Context, tena

return taskDurations, nil
}

type CELEvaluationFailure struct {
Source sqlcv1.V1CelEvaluationFailureSource `json:"source"`
ErrorMessage string `json:"error_message"`
}

func (r *OLAPRepositoryImpl) StoreCELEvaluationFailures(ctx context.Context, tenantId string, failures []CELEvaluationFailure) error {
errorMessages := make([]string, len(failures))
sources := make([]string, len(failures))

for i, failure := range failures {
errorMessages[i] = failure.ErrorMessage
sources[i] = string(failure.Source)
}

return r.queries.StoreCELEvaluationFailures(ctx, r.pool, sqlcv1.StoreCELEvaluationFailuresParams{
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Sources: sources,
Errors: errorMessages,
})
}
52 changes: 52 additions & 0 deletions pkg/repository/v1/sqlcv1/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 27 additions & 2 deletions pkg/repository/v1/sqlcv1/olap.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ SELECT
-- name: CreateOLAPEventPartitions :exec
SELECT
create_v1_range_partition('v1_events_olap'::text, @date::date),
create_v1_range_partition('v1_event_to_run_olap'::text, @date::date)
create_v1_range_partition('v1_event_to_run_olap'::text, @date::date),
create_v1_range_partition('v1_cel_evaluation_failures_olap'::text, @date::date)
;

-- name: ListOLAPPartitionsBeforeDate :many
Expand All @@ -27,6 +28,8 @@ WITH task_partitions AS (
SELECT 'v1_event_to_run_olap' AS parent_table, p::TEXT AS partition_name FROM get_v1_partitions_before_date('v1_event_to_run_olap', @date::date) AS p
), events_lookup_table_partitions AS (
SELECT 'v1_event_lookup_table_olap' AS parent_table, p::TEXT AS partition_name FROM get_v1_partitions_before_date('v1_event_lookup_table_olap', @date::date) AS p
), cel_evaluation_failures_partitions AS (
SELECT 'v1_cel_evaluation_failures_olap' AS parent_table, p::TEXT AS partition_name FROM get_v1_partitions_before_date('v1_cel_evaluation_failures_olap', @date::date) AS p
), candidates AS (
SELECT
*
Expand Down Expand Up @@ -67,14 +70,21 @@ WITH task_partitions AS (
*
FROM
events_lookup_table_partitions

UNION ALL

SELECT
*
FROM
cel_evaluation_failures_partitions
)

SELECT *
FROM candidates
WHERE
CASE
WHEN @shouldPartitionEventsTables::BOOLEAN THEN TRUE
ELSE parent_table NOT IN ('v1_events_olap', 'v1_event_to_run_olap')
ELSE parent_table NOT IN ('v1_events_olap', 'v1_event_to_run_olap', 'v1_cel_evaluation_failures_olap')
END
;

Expand Down Expand Up @@ -1684,3 +1694,18 @@ FROM
LEFT JOIN
task_times tt ON (td.task_id, td.inserted_at) = (tt.task_id, tt.inserted_at)
ORDER BY td.task_id, td.inserted_at;

-- name: StoreCELEvaluationFailures :exec
WITH inputs AS (
SELECT
UNNEST(CAST(@sources::TEXT[] AS v1_cel_evaluation_failure_source[])) AS source,
UNNEST(@errors::TEXT[]) AS error
)
INSERT INTO v1_cel_evaluation_failures_olap (
tenant_id,
source,
error
)
SELECT @tenantId::UUID, source, error
FROM inputs
;
Loading
Loading