feat(ops): adapt repository INSERT/SELECT + add setOpsEndpointContext in error logger middleware
This commit is contained in:
@ -27,6 +27,9 @@ const (
|
||||
opsRequestBodyKey = "ops_request_body"
|
||||
opsAccountIDKey = "ops_account_id"
|
||||
|
||||
opsUpstreamModelKey = "ops_upstream_model"
|
||||
opsRequestTypeKey = "ops_request_type"
|
||||
|
||||
// 错误过滤匹配常量 — shouldSkipOpsErrorLog 和错误分类共用
|
||||
opsErrContextCanceled = "context canceled"
|
||||
opsErrNoAvailableAccounts = "no available accounts"
|
||||
@ -345,6 +348,18 @@ func setOpsRequestContext(c *gin.Context, model string, stream bool, requestBody
|
||||
}
|
||||
}
|
||||
|
||||
// setOpsEndpointContext stores upstream model and request type for ops error logging.
|
||||
// Called by handlers after model mapping and request type determination.
|
||||
func setOpsEndpointContext(c *gin.Context, upstreamModel string, requestType int16) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
if upstreamModel = strings.TrimSpace(upstreamModel); upstreamModel != "" {
|
||||
c.Set(opsUpstreamModelKey, upstreamModel)
|
||||
}
|
||||
c.Set(opsRequestTypeKey, requestType)
|
||||
}
|
||||
|
||||
func attachOpsRequestBodyToEntry(c *gin.Context, entry *service.OpsInsertErrorLogInput) {
|
||||
if c == nil || entry == nil {
|
||||
return
|
||||
@ -628,7 +643,30 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc {
|
||||
}
|
||||
return ""
|
||||
}(),
|
||||
Stream: stream,
|
||||
Stream: stream,
|
||||
InboundEndpoint: GetInboundEndpoint(c),
|
||||
UpstreamEndpoint: GetUpstreamEndpoint(c, platform),
|
||||
RequestedModel: modelName,
|
||||
UpstreamModel: func() string {
|
||||
if v, ok := c.Get(opsUpstreamModelKey); ok {
|
||||
if s, ok := v.(string); ok {
|
||||
return strings.TrimSpace(s)
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}(),
|
||||
RequestType: func() *int16 {
|
||||
if v, ok := c.Get(opsRequestTypeKey); ok {
|
||||
switch t := v.(type) {
|
||||
case int16:
|
||||
return &t
|
||||
case int:
|
||||
v16 := int16(t)
|
||||
return &v16
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}(),
|
||||
UserAgent: c.GetHeader("User-Agent"),
|
||||
|
||||
ErrorPhase: "upstream",
|
||||
@ -756,7 +794,30 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc {
|
||||
}
|
||||
return ""
|
||||
}(),
|
||||
Stream: stream,
|
||||
Stream: stream,
|
||||
InboundEndpoint: GetInboundEndpoint(c),
|
||||
UpstreamEndpoint: GetUpstreamEndpoint(c, platform),
|
||||
RequestedModel: modelName,
|
||||
UpstreamModel: func() string {
|
||||
if v, ok := c.Get(opsUpstreamModelKey); ok {
|
||||
if s, ok := v.(string); ok {
|
||||
return strings.TrimSpace(s)
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}(),
|
||||
RequestType: func() *int16 {
|
||||
if v, ok := c.Get(opsRequestTypeKey); ok {
|
||||
switch t := v.(type) {
|
||||
case int16:
|
||||
return &t
|
||||
case int:
|
||||
v16 := int16(t)
|
||||
return &v16
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}(),
|
||||
UserAgent: c.GetHeader("User-Agent"),
|
||||
|
||||
ErrorPhase: phase,
|
||||
|
||||
@ -29,6 +29,11 @@ INSERT INTO ops_error_logs (
|
||||
model,
|
||||
request_path,
|
||||
stream,
|
||||
inbound_endpoint,
|
||||
upstream_endpoint,
|
||||
requested_model,
|
||||
upstream_model,
|
||||
request_type,
|
||||
user_agent,
|
||||
error_phase,
|
||||
error_type,
|
||||
@ -57,7 +62,7 @@ INSERT INTO ops_error_logs (
|
||||
retry_count,
|
||||
created_at
|
||||
) VALUES (
|
||||
$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33,$34,$35,$36,$37,$38
|
||||
$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33,$34,$35,$36,$37,$38,$39,$40,$41,$42,$43
|
||||
)`
|
||||
|
||||
func NewOpsRepository(db *sql.DB) service.OpsRepository {
|
||||
@ -140,6 +145,11 @@ func opsInsertErrorLogArgs(input *service.OpsInsertErrorLogInput) []any {
|
||||
opsNullString(input.Model),
|
||||
opsNullString(input.RequestPath),
|
||||
input.Stream,
|
||||
opsNullString(input.InboundEndpoint),
|
||||
opsNullString(input.UpstreamEndpoint),
|
||||
opsNullString(input.RequestedModel),
|
||||
opsNullString(input.UpstreamModel),
|
||||
opsNullInt16(input.RequestType),
|
||||
opsNullString(input.UserAgent),
|
||||
input.ErrorPhase,
|
||||
input.ErrorType,
|
||||
@ -231,7 +241,12 @@ SELECT
|
||||
COALESCE(g.name, ''),
|
||||
CASE WHEN e.client_ip IS NULL THEN NULL ELSE e.client_ip::text END,
|
||||
COALESCE(e.request_path, ''),
|
||||
e.stream
|
||||
e.stream,
|
||||
COALESCE(e.inbound_endpoint, ''),
|
||||
COALESCE(e.upstream_endpoint, ''),
|
||||
COALESCE(e.requested_model, ''),
|
||||
COALESCE(e.upstream_model, ''),
|
||||
e.request_type
|
||||
FROM ops_error_logs e
|
||||
LEFT JOIN accounts a ON e.account_id = a.id
|
||||
LEFT JOIN groups g ON e.group_id = g.id
|
||||
@ -263,6 +278,7 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
|
||||
var resolvedBy sql.NullInt64
|
||||
var resolvedByName string
|
||||
var resolvedRetryID sql.NullInt64
|
||||
var requestType sql.NullInt64
|
||||
if err := rows.Scan(
|
||||
&item.ID,
|
||||
&item.CreatedAt,
|
||||
@ -294,6 +310,11 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
|
||||
&clientIP,
|
||||
&item.RequestPath,
|
||||
&item.Stream,
|
||||
&item.InboundEndpoint,
|
||||
&item.UpstreamEndpoint,
|
||||
&item.RequestedModel,
|
||||
&item.UpstreamModel,
|
||||
&requestType,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -334,6 +355,10 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
|
||||
item.GroupID = &v
|
||||
}
|
||||
item.GroupName = groupName
|
||||
if requestType.Valid {
|
||||
v := int16(requestType.Int64)
|
||||
item.RequestType = &v
|
||||
}
|
||||
out = append(out, &item)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
@ -393,6 +418,11 @@ SELECT
|
||||
CASE WHEN e.client_ip IS NULL THEN NULL ELSE e.client_ip::text END,
|
||||
COALESCE(e.request_path, ''),
|
||||
e.stream,
|
||||
COALESCE(e.inbound_endpoint, ''),
|
||||
COALESCE(e.upstream_endpoint, ''),
|
||||
COALESCE(e.requested_model, ''),
|
||||
COALESCE(e.upstream_model, ''),
|
||||
e.request_type,
|
||||
COALESCE(e.user_agent, ''),
|
||||
e.auth_latency_ms,
|
||||
e.routing_latency_ms,
|
||||
@ -427,6 +457,7 @@ LIMIT 1`
|
||||
var responseLatency sql.NullInt64
|
||||
var ttft sql.NullInt64
|
||||
var requestBodyBytes sql.NullInt64
|
||||
var requestType sql.NullInt64
|
||||
|
||||
err := r.db.QueryRowContext(ctx, q, id).Scan(
|
||||
&out.ID,
|
||||
@ -464,6 +495,11 @@ LIMIT 1`
|
||||
&clientIP,
|
||||
&out.RequestPath,
|
||||
&out.Stream,
|
||||
&out.InboundEndpoint,
|
||||
&out.UpstreamEndpoint,
|
||||
&out.RequestedModel,
|
||||
&out.UpstreamModel,
|
||||
&requestType,
|
||||
&out.UserAgent,
|
||||
&authLatency,
|
||||
&routingLatency,
|
||||
@ -540,6 +576,10 @@ LIMIT 1`
|
||||
v := int(requestBodyBytes.Int64)
|
||||
out.RequestBodyBytes = &v
|
||||
}
|
||||
if requestType.Valid {
|
||||
v := int16(requestType.Int64)
|
||||
out.RequestType = &v
|
||||
}
|
||||
|
||||
// Normalize request_body to empty string when stored as JSON null.
|
||||
out.RequestBody = strings.TrimSpace(out.RequestBody)
|
||||
@ -1479,3 +1519,10 @@ func opsNullInt(v any) any {
|
||||
return sql.NullInt64{}
|
||||
}
|
||||
}
|
||||
|
||||
func opsNullInt16(v *int16) any {
|
||||
if v == nil {
|
||||
return sql.NullInt64{}
|
||||
}
|
||||
return sql.NullInt64{Int64: int64(*v), Valid: true}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user