API Usage#
This guide asumes you setup a client and are authenticated to the Fleet API. If you haven’t done so, please follow the Basic Client example first.
Fleets#
List all fleets.
fleetsResponse, err := fleetStateClient.ListFleets(
ctx,
&fleetstatev1beta4.ListFleetsRequest{},
)
if err != nil {
log.Fatalf("could not fetch fleets %v", err)
}
log.Printf("Fleets found: %v", jsonMarshalOptions.Format(fleetsResponse))
const fleets = await client.listFleets({});
console.log(JSON.stringify(fleets, null, 4));
Vehicles#
Request 2 vehicles from the first fleet.
firstFleetId := fleetsResponse.Fleets[0].Id
pageSize := int32(2)
firstFleetVehicles, err := fleetStateClient.ListVehicles(
ctx,
&fleetstatev1beta4.ListVehiclesRequest{
FleetId: firstFleetId,
PageSize: &pageSize,
},
)
if err != nil {
log.Fatalf("could not list vehicles in fleet: %v", err)
}
log.Printf(
"Fleet[%v] Vehicles Page 1: %v",
firstFleetId,
jsonMarshalOptions.Format(firstFleetVehicles),
)
const firstFleetId = fleets.fleets[0].id;
const vehiclesPage1 = await client.listVehicles({
fleetId: firstFleetId,
pageSize: 2,
});
console.log(JSON.stringify(vehiclesPage1, null, 4));
Request the remaining vehicles from the fleet using the nextPageToken
from the previous request.
if firstFleetVehicles.NextPageToken != nil {
pageSize := int32(100)
firstFleetVehiclesPage2, err := fleetStateClient.ListVehicles(
ctx,
&fleetstatev1beta4.ListVehiclesRequest{
FleetId: firstFleetId,
PageToken: firstFleetVehicles.NextPageToken,
PageSize: &pageSize,
},
)
if err != nil {
log.Fatalf("could not list vehicles in fleet: %v", err)
}
log.Printf(
"Fleet[%v] Vehicles Page 2: %v",
firstFleetId,
jsonMarshalOptions.Format(firstFleetVehiclesPage2),
)
}
if (vehiclesPage1.nextPageToken === "") {
console.log("No more vehicles");
} else {
const vehiclesPage2 = await client.listVehicles({
fleetId: fleets.fleets[0].id,
pageToken: vehiclesPage1.nextPageToken,
});
console.log(JSON.stringify(vehiclesPage2, null, 4));
}
Events#
The Fleet API also offers server-side streaming endpoints to subscribe to events. Streams can be durable for reliable message delivery or non-durable if the latest message contains the full state.
An example implementation of a non-durable streaming endpoint is shown here using the SubscribeToVehicleTelemetryMovement
endpoint.
For a Vehicle that is currently online and able to fulfill requests, subscribe to the telemetry movement data via the streaming endpoint.
Make sure to specify a timeout deadline for the streaming call and reconnect if needed.
quit := make(chan bool) // channel to close long-running streaming connections
go func() {
// use a larger timeout for streaming endpoints
// set the time out up to 1 hour depending on how long you want to keep the stream open
// and reconnect upon timeout
ctxStreamingTelemetry, cancelStreaming := context.WithTimeout(context.Background(), streamingTimeout)
defer cancelStreaming()
log.Printf("Waiting for vehicle movements: %v", testVehicleId)
streamVehicleTelemetry, err := fleetStateClient.SubscribeToVehicleTelemetryMovement(ctxStreamingTelemetry, &fleetstatev1beta4.SubscribeToVehicleTelemetryMovementRequest{VehicleId: testVehicleId})
if err != nil {
log.Fatalf("Could not start stream vehicle movement: %v", err)
}
for {
select {
case <-quit:
// close stream on shutdown signal
err := streamVehicleTelemetry.CloseSend()
if err != nil {
log.Fatalf("Could not close stream: %v.", err)
}
return
default:
movement, err := streamVehicleTelemetry.Recv()
if err == io.EOF || status.Code(err) == codes.DeadlineExceeded {
log.Println("VehicleTelemetry Stream closed. Reconnecting...")
// server or client has closed the stream, reconnect
ctxStreamingTelemetry, cancelStreaming = context.WithTimeout(context.Background(), streamingTimeout)
defer cancelStreaming()
streamVehicleTelemetry, err = fleetStateClient.SubscribeToVehicleTelemetryMovement(ctxStreamingTelemetry, &fleetstatev1beta4.SubscribeToVehicleTelemetryMovementRequest{VehicleId: testVehicleId})
if err != nil {
log.Fatalf("Could not start stream vehicle movement: %v", err)
}
} else if err != nil {
log.Fatalf("could not stream vehicle movement: %v", err)
} else {
handleVehicleTelemetryMessage(testVehicleId, movement)
}
}
}
}()
const abortControllerStreams = new AbortController();
const abortSignal = abortControllerStreams.signal
async function streamVehicleTelemetry() {
while(true) {
try {
const streamVehicleTelemetryMovement = client.subscribeToVehicleTelemetryMovement({
vehicleId: testVehicleId,
}, { signal: abortSignal, deadline: streamingTimeoutInMs});
for await (const telemetryMovement of streamVehicleTelemetryMovement) {
// handling of telemetry event, add your consumption logic here
console.log(JSON.stringify(telemetryMovement, null, 4));
}
} catch (error: unknown) {
if (error instanceof grpc.ClientError && error.code === grpc.Status.DEADLINE_EXCEEDED) {
console.log("VehicleTelemetry Stream closed. Reconnecting...")
} else if (isAbortError(error)) {
console.log("Received aborting signal, shutdown vehicle telemetry stream.");
return
} else {
console.error(error);
throw error;
}
}
}
}
streamVehicleTelemetry();
Durable streaming endpoints provide you with the option to continue reading a stream after a disconnect.
To not miss any events, it is important to save the lastMessageId
and use it to continue the stream and replay missed events.
In the example below you can see that we initially do not have a lastMessageId
and we do not set it when calling the streaming endpoint.
From that time we save the lastMessageId
with every incoming message and set it when reconnecting.
Note that you receive a message with a nil event and a message id on initial connection.
An example implementation of a durable streaming endpoint is shown here using the SubscribeToFleetMissionEvents
endpoint.
Here, we reconnect with lastMessageId
once our deadline exceeded but the same approach can also be used after an unexpected downtime to replay any missed events.
go func() {
ctxMissionEventsStreaming, cancel := context.WithTimeout(context.Background(), streamingTimeout)
defer cancel()
log.Printf("Waiting for vehicle mission events updates: %v", testVehicleId)
// When first subscribing to the stream do not specify the lastMessageId
streamMissionEvents, err := fleetStateClient.SubscribeToFleetMissionEvents(
ctxMissionEventsStreaming,
&fleetstatev1beta4.SubscribeToFleetMissionEventsRequest{
FleetId: "bb860891-5198-4d1d-8f75-45efd600e710",
},
)
if err != nil {
log.Fatalf("Could not start stream mission events: %v", err)
}
// Receive the initial message with a null event and with a last message ID to reconnect whenever needed.
initialMsg, err := streamMissionEvents.Recv()
if err != nil {
log.Fatalf("Could not receive initial message.")
}
lastReceivedMessageId := initialMsg.MessageId
for {
select {
case <-quit:
// close stream on shutdown signal
err := streamMissionEvents.CloseSend()
if err != nil {
log.Fatalf("Could not close stream: %v.", err)
}
return
default:
msg, err := streamMissionEvents.Recv()
if err == io.EOF || status.Code(err) == codes.DeadlineExceeded {
// either server closed stream or timeout reached
// reconnect with latest known message id to replay events from this point forward
log.Println("Mission EventSream closed. Reconnecting...")
ctxMissionEventsStreaming, _ = context.WithTimeout(context.Background(), streamingTimeout)
streamMissionEvents, err = fleetStateClient.SubscribeToFleetMissionEvents(
ctxMissionEventsStreaming,
&fleetstatev1beta4.SubscribeToFleetMissionEventsRequest{
FleetId: firstFleetId,
LastMessageId: &lastReceivedMessageId,
},
)
if err != nil {
log.Fatalf("Could not receive initial message: %v.", err)
}
} else if err != nil {
log.Fatalf("Unexpected error occured: %v", err)
} else {
// custom handling of mission progress event, add your consumption logic here
handleMissionEventMessage(testVehicleId, msg)
// save the id of the last processed message to reconnect whenever needed
lastReceivedMessageId = msg.MessageId
}
}
}
}()
async function streamVehicleMissionEvents() {
// When first subscribing to the stream we do not specify a lastMessageId
let lastKnownMessageId: undefined | string = undefined;
while(true) {
try {
const streamVehicleMissionEvent = client.subscribeToFleetMissionEvents({
fleetId: firstFleetId,
lastMessageId: lastKnownMessageId,
}, { signal: abortSignal, deadline: streamingTimeoutInMs });
for await (const missionEvent of streamVehicleMissionEvent) {
// handling of mission progress event, add your consumption logic here
console.log(JSON.stringify(missionEvent, null, 4));
// save the id of the last processed message to reconnect whenever needed
lastKnownMessageId = missionEvent.messageId
}
} catch (error: unknown) {
if (error instanceof grpc.ClientError && error.code === grpc.Status.DEADLINE_EXCEEDED) {
console.log("MissionEvent Stream closed. Reconnecting...")
} else if (isAbortError(error)) {
console.log("Received aborting signal, shutdown mission event stream.");
return;
} else {
console.error(error);
throw error;
}
}
}
}
streamVehicleMissionEvents()
Full Example Source Code#
package main
import (
context "context"
"io"
"log"
"net/http"
"os"
"time"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
grpc "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/oauth"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
"moia.io/fleetstate-client/fleetstatev1beta4"
)
const (
streamingTimeout = 30 * time.Second
unaryTimeout = 5 * time.Second
)
func main() {
// Section: Authentication
clientId := os.Getenv("CLIENT_ID")
clientSecret := os.Getenv("CLIENT_SECRET")
const tokenUrl = "https://fleet-api.int.eu-central-1.moia-group.io/" +
"auth/oauth/token"
const testVehicleId = "819f0cda-62dd-4251-9efd-9c4294b4f42d"
oauthConfig := clientcredentials.Config{
ClientID: clientId,
ClientSecret: clientSecret,
TokenURL: tokenUrl,
AuthStyle: oauth2.AuthStyleInHeader,
}
httpCtx := context.Background()
httpClient := &http.Client{Timeout: 30 * time.Second}
httpCtx = context.WithValue(httpCtx, oauth2.HTTPClient, httpClient)
// Used in the next steps
accessToken, err := oauthConfig.Token(httpCtx)
if err != nil {
log.Fatalf("could not fetch token: %v", err)
}
// End Section
// Section: Connect to the Fleet API
perRPC := oauth.TokenSource{
TokenSource: oauth2.StaticTokenSource(accessToken),
}
const apiUrl = "fleet-api.int.eu-central-1.moia-group.io:443"
conn, err := grpc.Dial(
apiUrl,
grpc.WithTransportCredentials(
credentials.NewClientTLSFromCert(nil, ""),
),
grpc.WithPerRPCCredentials(perRPC),
)
if err != nil {
log.Fatalf("could not connect to %s: %v", apiUrl, err)
}
defer conn.Close()
fleetStateClient := fleetstatev1beta4.NewFleetStateServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), unaryTimeout)
defer cancel()
// Used for printing responses
jsonMarshalOptions := protojson.MarshalOptions{
Multiline: true,
EmitUnpopulated: true,
}
// End Section
// Section: Use fleet API 1
fleetsResponse, err := fleetStateClient.ListFleets(
ctx,
&fleetstatev1beta4.ListFleetsRequest{},
)
if err != nil {
log.Fatalf("could not fetch fleets %v", err)
}
log.Printf("Fleets found: %v", jsonMarshalOptions.Format(fleetsResponse))
// End Section
// Section: Use fleet API 2
firstFleetId := fleetsResponse.Fleets[0].Id
pageSize := int32(2)
firstFleetVehicles, err := fleetStateClient.ListVehicles(
ctx,
&fleetstatev1beta4.ListVehiclesRequest{
FleetId: firstFleetId,
PageSize: &pageSize,
},
)
if err != nil {
log.Fatalf("could not list vehicles in fleet: %v", err)
}
log.Printf(
"Fleet[%v] Vehicles Page 1: %v",
firstFleetId,
jsonMarshalOptions.Format(firstFleetVehicles),
)
// End Section
// Section: Use fleet API 3
if firstFleetVehicles.NextPageToken != nil {
pageSize := int32(100)
firstFleetVehiclesPage2, err := fleetStateClient.ListVehicles(
ctx,
&fleetstatev1beta4.ListVehiclesRequest{
FleetId: firstFleetId,
PageToken: firstFleetVehicles.NextPageToken,
PageSize: &pageSize,
},
)
if err != nil {
log.Fatalf("could not list vehicles in fleet: %v", err)
}
log.Printf(
"Fleet[%v] Vehicles Page 2: %v",
firstFleetId,
jsonMarshalOptions.Format(firstFleetVehiclesPage2),
)
}
// End Section
// Section: Use fleet API 4
quit := make(chan bool) // channel to close long-running streaming connections
go func() {
// use a larger timeout for streaming endpoints
// set the time out up to 1 hour depending on how long you want to keep the stream open
// and reconnect upon timeout
ctxStreamingTelemetry, cancelStreaming := context.WithTimeout(context.Background(), streamingTimeout)
defer cancelStreaming()
log.Printf("Waiting for vehicle movements: %v", testVehicleId)
streamVehicleTelemetry, err := fleetStateClient.SubscribeToVehicleTelemetryMovement(ctxStreamingTelemetry, &fleetstatev1beta4.SubscribeToVehicleTelemetryMovementRequest{VehicleId: testVehicleId})
if err != nil {
log.Fatalf("Could not start stream vehicle movement: %v", err)
}
for {
select {
case <-quit:
// close stream on shutdown signal
err := streamVehicleTelemetry.CloseSend()
if err != nil {
log.Fatalf("Could not close stream: %v.", err)
}
return
default:
movement, err := streamVehicleTelemetry.Recv()
if err == io.EOF || status.Code(err) == codes.DeadlineExceeded {
log.Println("VehicleTelemetry Stream closed. Reconnecting...")
// server or client has closed the stream, reconnect
ctxStreamingTelemetry, cancelStreaming = context.WithTimeout(context.Background(), streamingTimeout)
defer cancelStreaming()
streamVehicleTelemetry, err = fleetStateClient.SubscribeToVehicleTelemetryMovement(ctxStreamingTelemetry, &fleetstatev1beta4.SubscribeToVehicleTelemetryMovementRequest{VehicleId: testVehicleId})
if err != nil {
log.Fatalf("Could not start stream vehicle movement: %v", err)
}
} else if err != nil {
log.Fatalf("could not stream vehicle movement: %v", err)
} else {
handleVehicleTelemetryMessage(testVehicleId, movement)
}
}
}
}()
// End Section
// Section: Use fleet API 5
go func() {
ctxMissionEventsStreaming, cancel := context.WithTimeout(context.Background(), streamingTimeout)
defer cancel()
log.Printf("Waiting for vehicle mission events updates: %v", testVehicleId)
// When first subscribing to the stream do not specify the lastMessageId
streamMissionEvents, err := fleetStateClient.SubscribeToFleetMissionEvents(
ctxMissionEventsStreaming,
&fleetstatev1beta4.SubscribeToFleetMissionEventsRequest{
FleetId: "bb860891-5198-4d1d-8f75-45efd600e710",
},
)
if err != nil {
log.Fatalf("Could not start stream mission events: %v", err)
}
// Receive the initial message with a null event and with a last message ID to reconnect whenever needed.
initialMsg, err := streamMissionEvents.Recv()
if err != nil {
log.Fatalf("Could not receive initial message.")
}
lastReceivedMessageId := initialMsg.MessageId
for {
select {
case <-quit:
// close stream on shutdown signal
err := streamMissionEvents.CloseSend()
if err != nil {
log.Fatalf("Could not close stream: %v.", err)
}
return
default:
msg, err := streamMissionEvents.Recv()
if err == io.EOF || status.Code(err) == codes.DeadlineExceeded {
// either server closed stream or timeout reached
// reconnect with latest known message id to replay events from this point forward
log.Println("Mission EventSream closed. Reconnecting...")
ctxMissionEventsStreaming, _ = context.WithTimeout(context.Background(), streamingTimeout)
streamMissionEvents, err = fleetStateClient.SubscribeToFleetMissionEvents(
ctxMissionEventsStreaming,
&fleetstatev1beta4.SubscribeToFleetMissionEventsRequest{
FleetId: firstFleetId,
LastMessageId: &lastReceivedMessageId,
},
)
if err != nil {
log.Fatalf("Could not receive initial message: %v.", err)
}
} else if err != nil {
log.Fatalf("Unexpected error occured: %v", err)
} else {
// custom handling of mission progress event, add your consumption logic here
handleMissionEventMessage(testVehicleId, msg)
// save the id of the last processed message to reconnect whenever needed
lastReceivedMessageId = msg.MessageId
}
}
}
}()
// End Section
time.Sleep(2 * time.Minute)
// shutdown client
log.Println("Shutdown ")
quit <- true
}
func handleMissionEventMessage(vehicleId string, msg *fleetstatev1beta4.SubscribeToFleetMissionEventsResponse) {
log.Printf("Vehicle[%v] Mission Event: %v", vehicleId, protojson.MarshalOptions{Multiline: true, EmitUnpopulated: true}.Format(msg))
}
func handleVehicleTelemetryMessage(vehicleId string, msg *fleetstatev1beta4.SubscribeToVehicleTelemetryMovementResponse) {
log.Printf("Vehicle[%v] Movement: %v", vehicleId, protojson.MarshalOptions{Multiline: true, EmitUnpopulated: true}.Format(msg))
}
import * as grpc from "nice-grpc";
import { FleetStateServiceDefinition } from "./fleet_state";
import {isAbortError} from 'abort-controller-x';
import {deadlineMiddleware} from 'nice-grpc-client-middleware-deadline';
import * as http2 from "http2";
const streamingTimeoutInMs = 30 * 1000 // 30s
async function main() {
// Section: Authentication
const clientId = process.env.CLIENT_ID;
const clientSecret = process.env.CLIENT_SECRET;
const tokenUrl =
"https://fleet-api.int.eu-central-1.moia-group.io/" +
"auth/oauth/token";
const testVehicleId = "819f0cda-62dd-4251-9efd-9c4294b4f42d";
const authHeader = Buffer.from(`${clientId}:${clientSecret}`).toString(
"base64"
);
const http2AuthClient = http2.connect(tokenUrl);
const tokenRequest = http2AuthClient.request({
":method": "POST",
":path": "/auth/oauth/token",
"authorization": `Basic ${authHeader}`,
"content-type": "application/x-www-form-urlencoded",
});
tokenRequest.setEncoding("utf8");
const requestBody = "grant_type=client_credentials";
tokenRequest.write(requestBody);
tokenRequest.end();
let tokenResponse = "";
for await (const data of tokenRequest) {
tokenResponse += data;
}
const parsedTokenResponse = JSON.parse(tokenResponse) as Record<string, any>;
if (parsedTokenResponse.error) {
console.error(parsedTokenResponse.error_description);
return;
}
http2AuthClient.close();
// Used in the next steps
const accessToken = parsedTokenResponse.access_token;
// End Section
// Section: Connect to the Fleet API
const apiUrl = "fleet-api.int.eu-central-1.moia-group.io:443";
const clientFactory = grpc.createClientFactory().use(deadlineMiddleware);
const channel = grpc.createChannel(
apiUrl,
grpc.ChannelCredentials.createSsl()
);
const client = clientFactory.create(FleetStateServiceDefinition, channel, {
"*": {
metadata: new grpc.Metadata({
Authorization: `Bearer ${accessToken}`,
}),
},
});
// End Section
// Section: Use fleet API 1
const fleets = await client.listFleets({});
console.log(JSON.stringify(fleets, null, 4));
// End Section
// Section: Use fleet API 2
const firstFleetId = fleets.fleets[0].id;
const vehiclesPage1 = await client.listVehicles({
fleetId: firstFleetId,
pageSize: 2,
});
console.log(JSON.stringify(vehiclesPage1, null, 4));
// End Section
// Section: Use fleet API 3
if (vehiclesPage1.nextPageToken === "") {
console.log("No more vehicles");
} else {
const vehiclesPage2 = await client.listVehicles({
fleetId: fleets.fleets[0].id,
pageToken: vehiclesPage1.nextPageToken,
});
console.log(JSON.stringify(vehiclesPage2, null, 4));
}
// End Section
// Section: Use fleet API 4
const abortControllerStreams = new AbortController();
const abortSignal = abortControllerStreams.signal
async function streamVehicleTelemetry() {
while(true) {
try {
const streamVehicleTelemetryMovement = client.subscribeToVehicleTelemetryMovement({
vehicleId: testVehicleId,
}, { signal: abortSignal, deadline: streamingTimeoutInMs});
for await (const telemetryMovement of streamVehicleTelemetryMovement) {
// handling of telemetry event, add your consumption logic here
console.log(JSON.stringify(telemetryMovement, null, 4));
}
} catch (error: unknown) {
if (error instanceof grpc.ClientError && error.code === grpc.Status.DEADLINE_EXCEEDED) {
console.log("VehicleTelemetry Stream closed. Reconnecting...")
} else if (isAbortError(error)) {
console.log("Received aborting signal, shutdown vehicle telemetry stream.");
return
} else {
console.error(error);
throw error;
}
}
}
}
streamVehicleTelemetry();
// End Section
// Section: Use fleet API 5
async function streamVehicleMissionEvents() {
// When first subscribing to the stream we do not specify a lastMessageId
let lastKnownMessageId: undefined | string = undefined;
while(true) {
try {
const streamVehicleMissionEvent = client.subscribeToFleetMissionEvents({
fleetId: firstFleetId,
lastMessageId: lastKnownMessageId,
}, { signal: abortSignal, deadline: streamingTimeoutInMs });
for await (const missionEvent of streamVehicleMissionEvent) {
// handling of mission progress event, add your consumption logic here
console.log(JSON.stringify(missionEvent, null, 4));
// save the id of the last processed message to reconnect whenever needed
lastKnownMessageId = missionEvent.messageId
}
} catch (error: unknown) {
if (error instanceof grpc.ClientError && error.code === grpc.Status.DEADLINE_EXCEEDED) {
console.log("MissionEvent Stream closed. Reconnecting...")
} else if (isAbortError(error)) {
console.log("Received aborting signal, shutdown mission event stream.");
return;
} else {
console.error(error);
throw error;
}
}
}
}
streamVehicleMissionEvents()
// End Section
await new Promise(f => setTimeout(f, 2*60*1000)); // sleep for 3 minutes
console.log('Shutdown client...')
abortControllerStreams.abort()
channel.close()
}
main()