Skip to main content
Processors implement the processor interface defined in .tangent/wit/processor.wit.

Interface

The interface has three methods to implement

Metadata

Name and version of your plugin
import (
    tangent_sdk "github.com/telophasehq/tangent-sdk-go"
)

var Metadata = tangent_sdk.Metadata{
    Name:    "golang",
    Version: "0.2.0",
}

Probe

Used to tell the host which logs to send to the mapper
import (
    tangent_sdk "github.com/telophasehq/tangent-sdk-go"
)

var selectors = []tangent_sdk.Selector{
    {
        All: []tangent_sdk.Predicate{
            tangent_sdk.EqString("source.name", "myservice"),
        },
    },
}
Will send all logs with
{"source.name": "myservice"}
or
{"source": {"name": "my service"}}

ProcessLogs

This is the business logic of your plugin. It takes in a log.Logview, which allows it to read json data provided by the host. This function can return scalars, lists, or maps. Performance note: Always prefer to fetch scalar values from the log. Scalars do not create heap allocations. Fetching lists and maps allocate on the heap, which hurts performance.
import (
    tangent_sdk "github.com/telophasehq/tangent-sdk-go"
    "github.com/telophasehq/tangent-sdk-go/helpers"
)

type ExampleOutput struct {
    Msg      string   `json:"message"`
}

func ExampleMapper(lv tangent_sdk.Log) (ExampleOutput, error) {
    var out ExampleOutput
    msg := helpers.GetString(lv, "msg")
    if msg != nil {
        out.Msg = *msg
    }

    return out, nil
}

func init() {
    tangent_sdk.Wire[ExampleOutput](
        Metadata,
        selectors,
        ExampleMapper,
        nil,
    )
}

func main() {}

Enriching logs

Plugins can make network calls to enrich logs. In this example, we batch all unique IPs found in the logs, call a remote API once per IP, and merge the results back into the output. The Go, Python, and Rust examples below show the same enrichment behavior.
import (
    "encoding/json"
    "fmt"
    "net/url"

    tangent_sdk "github.com/telophasehq/tangent-sdk-go"
    "github.com/telophasehq/tangent-sdk-go/helpers"
    "github.com/telophasehq/tangent-sdk-go/http"
)

type EnrichedOutput struct {
    IPAddress string `json:"ip_address"`
    Country   string `json:"country"`
    Service   string `json:"service"`
}

func ExampleMapper(lvs []tangent_sdk.Log) ([]EnrichedOutput, error) {
    outs := make([]EnrichedOutput, len(lvs))

    ipToIdx := make(map[string][]int)

    for i, lv := range lvs {
        if svc := helpers.GetString(lv, "service"); svc != nil {
            outs[i].Service = *svc
        }

        ipPtr := helpers.GetString(lv, "ip_address")
        if ipPtr == nil || *ipPtr == "" {
            continue
        }

        ip := *ipPtr
        outs[i].IPAddress = ip
        ipToIdx[ip] = append(ipToIdx[ip], i)
    }

    if len(ipToIdx) == 0 {
        return outs, nil
    }

    reqs := make([]http.RemoteRequest, 0, len(ipToIdx))
    for ip := range ipToIdx {
        u := "https://ipinfo.io/" + url.QueryEscape(ip)

        reqs = append(reqs, http.RemoteRequest{
            ID:     ip,
            Method: http.RemoteMethodGet,
            URL:    u,
        })
    }

    resps, err := http.RemoteCallBatch(reqs)
    if err != nil {
        return nil, fmt.Errorf("remote batch call failed: %w", err)
    }

    type ipinfoPayload struct {
        Country string `json:"country"`
    }

    ipToCountry := make(map[string]string, len(resps))

    for _, resp := range resps {
        if resp.Error != nil && *resp.Error != "" {
            return nil, fmt.Errorf("remote error for ip %s: %s", resp.ID, *resp.Error)
        }

        if resp.Status != 200 {
            return nil, fmt.Errorf("remote returned status %d for ip %s", resp.Status, resp.ID)
        }

        var payload ipinfoPayload
        if err := json.Unmarshal(resp.Body, &payload); err != nil {
            return nil, fmt.Errorf("failed to decode ipinfo response for %s: %w", resp.ID, err)
        }

        ipToCountry[resp.ID] = payload.Country
    }

    for ip, idxs := range ipToIdx {
        country := ipToCountry[ip]
        for _, i := range idxs {
            outs[i].Country = country
        }
    }

    return outs, nil
}

func init() {
    tangent_sdk.Wire[EnrichedOutput](
        Metadata,
        selectors,
        nil,           // per-log handler
        ExampleMapper, // batch handler
    )
}

func main() {}

Local dev loop

tangent plugin compile --config tangent.yaml --wit ./.tangent/wit
tangent plugin test --config tangent.yaml
tangent run --config tangent.yaml

# In a separate window
tangent bench --config tangent.yaml --seconds 30 --payload tests/input.json