processor interface defined in .tangent/wit/processor.wit.
Interface
The interface has three methods to implementMetadata
Name and version of your plugin- Go
- Python
- Rust
Copy
import (
tangent_sdk "github.com/telophasehq/tangent-sdk-go"
)
var Metadata = tangent_sdk.Metadata{
Name: "golang",
Version: "0.2.0",
}
Copy
def metadata(self) -> mapper.Meta:
return mapper.Meta(name="python-example", version="0.1.0")
Copy
use exports::tangent::logs::mapper::Meta;
fn metadata() -> Meta {
Meta {
name: "rust".to_string(),
version: "0.1.0".to_string(),
}
}
Probe
Used to tell the host which logs to send to the mapper- Go
- Python
- Rust
Copy
import (
tangent_sdk "github.com/telophasehq/tangent-sdk-go"
)
var selectors = []tangent_sdk.Selector{
{
All: []tangent_sdk.Predicate{
tangent_sdk.EqString("source.name", "myservice"),
},
},
}
Copy
def probe(self) -> List[mapper.Selector]:
# Match logs where source.name == "myservice"
return [
mapper.Selector(
any=[],
all=[
mapper.Pred_Eq(
("source.name", log.Scalar_Str("myservice"))
)
],
none=[],
)
]
Copy
use exports::tangent::logs::mapper::{Pred, Selector};
use tangent::logs::log::Scalar;
fn probe() -> Vec<Selector> {
vec![Selector {
any: Vec::new(),
all: vec![Pred::Eq((
"source.name".to_string(),
Scalar::Str("myservice".to_string()),
))],
none: Vec::new(),
}]
}
Copy
{"source.name": "myservice"}
Copy
{"source": {"name": "my service"}}
ProcessLogs
This is the business logic of your plugin. It takes in alog.Logview, which allows it to read json data provided by the host. This function can return scalars, lists, or maps.
Performance note: Always prefer to fetch scalar values from the log. Scalars do not create heap allocations. Fetching lists and maps allocate on the heap, which hurts performance.
- Go
- Python
- Rust
Copy
import (
tangent_sdk "github.com/telophasehq/tangent-sdk-go"
"github.com/telophasehq/tangent-sdk-go/helpers"
)
type ExampleOutput struct {
Msg string `json:"message"`
}
func ExampleMapper(lv tangent_sdk.Log) (ExampleOutput, error) {
var out ExampleOutput
msg := helpers.GetString(lv, "msg")
if msg != nil {
out.Msg = *msg
}
return out, nil
}
func init() {
tangent_sdk.Wire[ExampleOutput](
Metadata,
selectors,
ExampleMapper,
nil,
)
}
func main() {}
Copy
def process_logs(self, logs: List[log.Logview]) -> bytes:
buf = bytearray()
for lv in logs:
with lv:
msg = lv.get("msg")
value = msg.value if msg is not None else ""
out.extend(json.dumps({"message": value}).encode() + b"\n")
return bytes(buf)
Copy
use exports::tangent::logs::mapper::Guest;
use tangent::logs::log::{Logview, Scalar};
#[derive(Default, Serialize)]
struct ExampleOutput {
message: String,
}
fn process_logs(input: Vec<Logview>) -> Result<Vec<u8>, String> {
let mut buf = Vec::new();
for lv in input {
let mut out = ExampleOutput::default();
if let Some(val) = lv.get("msg").and_then(string_from_scalar) {
out.message = val;
}
let json_line = serde_json::to_vec(&out).map_err(|e| e.to_string())?;
buf.extend(json_line);
buf.push(b'\n');
}
Ok(buf)
}
Enriching logs
Plugins can make network calls to enrich logs. In this example, we batch all unique IPs found in the logs, call a remote API once per IP, and merge the results back into the output. The Go, Python, and Rust examples below show the same enrichment behavior.- Go
- Python
- Rust
Copy
import (
"encoding/json"
"fmt"
"net/url"
tangent_sdk "github.com/telophasehq/tangent-sdk-go"
"github.com/telophasehq/tangent-sdk-go/helpers"
"github.com/telophasehq/tangent-sdk-go/http"
)
type EnrichedOutput struct {
IPAddress string `json:"ip_address"`
Country string `json:"country"`
Service string `json:"service"`
}
func ExampleMapper(lvs []tangent_sdk.Log) ([]EnrichedOutput, error) {
outs := make([]EnrichedOutput, len(lvs))
ipToIdx := make(map[string][]int)
for i, lv := range lvs {
if svc := helpers.GetString(lv, "service"); svc != nil {
outs[i].Service = *svc
}
ipPtr := helpers.GetString(lv, "ip_address")
if ipPtr == nil || *ipPtr == "" {
continue
}
ip := *ipPtr
outs[i].IPAddress = ip
ipToIdx[ip] = append(ipToIdx[ip], i)
}
if len(ipToIdx) == 0 {
return outs, nil
}
reqs := make([]http.RemoteRequest, 0, len(ipToIdx))
for ip := range ipToIdx {
u := "https://ipinfo.io/" + url.QueryEscape(ip)
reqs = append(reqs, http.RemoteRequest{
ID: ip,
Method: http.RemoteMethodGet,
URL: u,
})
}
resps, err := http.RemoteCallBatch(reqs)
if err != nil {
return nil, fmt.Errorf("remote batch call failed: %w", err)
}
type ipinfoPayload struct {
Country string `json:"country"`
}
ipToCountry := make(map[string]string, len(resps))
for _, resp := range resps {
if resp.Error != nil && *resp.Error != "" {
return nil, fmt.Errorf("remote error for ip %s: %s", resp.ID, *resp.Error)
}
if resp.Status != 200 {
return nil, fmt.Errorf("remote returned status %d for ip %s", resp.Status, resp.ID)
}
var payload ipinfoPayload
if err := json.Unmarshal(resp.Body, &payload); err != nil {
return nil, fmt.Errorf("failed to decode ipinfo response for %s: %w", resp.ID, err)
}
ipToCountry[resp.ID] = payload.Country
}
for ip, idxs := range ipToIdx {
country := ipToCountry[ip]
for _, i := range idxs {
outs[i].Country = country
}
}
return outs, nil
}
func init() {
tangent_sdk.Wire[EnrichedOutput](
Metadata,
selectors,
nil, // per-log handler
ExampleMapper, // batch handler
)
}
func main() {}
Copy
import json
from typing import List, Dict
from wit_world.exports import mapper
from wit_world.imports import log, http
def process_logs(self, logs: List[log.Logview]) -> bytes:
outs = []
ip_to_idxs: Dict[str, List[int]] = {}
# 1. Extract IPs & service names
for i, lv in enumerate(logs):
with lv:
service = lv.get("service")
ip = lv.get("ip_address")
entry = {
"service": service.value if service else "",
"ip_address": ip.value if ip else "",
"country": ""
}
outs.append(entry)
if ip and ip.value:
ip_to_idxs.setdefault(ip.value, []).append(i)
if not ip_to_idxs:
return b"\n".join(json.dumps(o).encode() for o in outs) + b"\n"
# 2. Build remote requests
reqs = []
for ip in ip_to_idxs.keys():
reqs.append(
http.RemoteRequest(
id=ip,
method=http.RemoteMethod.GET,
url=f"https://ipinfo.io/{ip}"
)
)
# 3. Make batched remote calls
resps = http.remote_call_batch(reqs)
ip_to_country = {}
for resp in resps:
if resp.error is not None and resp.error != "":
raise Exception(f"remote error for ip {resp.id}: {resp.error}")
if resp.status != 200:
raise Exception(f"unexpected status {resp.status} for ip {resp.id}")
payload = json.loads(resp.body.decode())
ip_to_country[resp.id] = payload.get("country", "")
# 4. Assign country back to outputs
for ip, idxs in ip_to_idxs.items():
for i in idxs:
outs[i]["country"] = ip_to_country.get(ip, "")
# 5. Return JSON
buf = bytearray()
for o in outs:
buf.extend(json.dumps(o).encode() + b"\n")
return bytes(buf)
Copy
use serde::{Deserialize, Serialize};
use tangent::logs::log::Logview;
use exports::tangent::logs::mapper::http::{RemoteMethod, RemoteRequest, remote_call_batch};
#[derive(Default, Serialize)]
struct EnrichedOutput {
ip_address: String,
country: String,
service: String,
}
fn process_logs(input: Vec<Logview>) -> Result<Vec<u8>, String> {
let mut outs = Vec::<EnrichedOutput>::with_capacity(input.len());
let mut ip_to_idxs: std::collections::HashMap<String, Vec<usize>> = std::collections::HashMap::new();
// 1. Extract IPs & services
for (i, lv) in input.iter().enumerate() {
let mut out = EnrichedOutput::default();
if let Some(svc) = lv.get("service").and_then(string_from_scalar) {
out.service = svc;
}
if let Some(ip) = lv.get("ip_address").and_then(string_from_scalar) {
if !ip.is_empty() {
ip_to_idxs.entry(ip.clone()).or_default().push(i);
}
out.ip_address = ip;
}
outs.push(out);
}
// If no IPs, return quickly
if ip_to_idxs.is_empty() {
let mut buf = Vec::new();
for o in outs {
let line = serde_json::to_vec(&o).map_err(|e| e.to_string())?;
buf.extend(line);
buf.push(b'\n');
}
return Ok(buf);
}
// 2. Build remote requests
let reqs: Vec<RemoteRequest> = ip_to_idxs
.keys()
.map(|ip| RemoteRequest {
id: ip.clone(),
method: RemoteMethod::Get,
url: format!("https://ipinfo.io/{}", ip),
})
.collect();
// 3. Make batched calls
let responses = remote_call_batch(&reqs).map_err(|e| e.to_string())?;
let mut ip_to_country = std::collections::HashMap::new();
#[derive(Deserialize)]
struct IpInfoPayload {
country: Option<String>,
}
// 4. Validate & parse responses
for resp in responses {
if let Some(err) = resp.error {
if !err.is_empty() {
return Err(format!("remote error for {}: {}", resp.id, err));
}
}
if resp.status != 200 {
return Err(format!(
"unexpected status {} for ip {}",
resp.status, resp.id
));
}
let payload: IpInfoPayload =
serde_json::from_slice(&resp.body).map_err(|e| e.to_string())?;
ip_to_country.insert(resp.id, payload.country.unwrap_or_default());
}
// 5. Attach countries to outputs
for (ip, idxs) in ip_to_idxs {
let country = ip_to_country.get(&ip).cloned().unwrap_or_default();
for idx in idxs {
outs[idx].country = country.clone();
}
}
// 6. Return JSONL
let mut buf = Vec::new();
for o in outs {
let line = serde_json::to_vec(&o).map_err(|e| e.to_string())?;
buf.extend(line);
buf.push(b'\n');
}
Ok(buf)
}
Local dev loop
Copy
tangent plugin compile --config tangent.yaml --wit ./.tangent/wit
tangent plugin test --config tangent.yaml
tangent run --config tangent.yaml
# In a separate window
tangent bench --config tangent.yaml --seconds 30 --payload tests/input.json