API Usage#

This guide asumes you setup a client and are authenticated to the Fleet API. If you haven’t done so, please follow the Basic Client example first.

Fleets#

List all fleets.

fleetsResponse, err := fleetStateClient.ListFleets(
	ctx,
	&fleetstatev1beta4.ListFleetsRequest{},
)
if err != nil {
	log.Fatalf("could not fetch fleets %v", err)
}
log.Printf("Fleets found: %v", jsonMarshalOptions.Format(fleetsResponse))
const fleets = await client.listFleets({});
console.log(JSON.stringify(fleets, null, 4));

Vehicles#

Request 2 vehicles from the first fleet.

firstFleetId := fleetsResponse.Fleets[0].Id

pageSize := int32(2)
firstFleetVehicles, err := fleetStateClient.ListVehicles(
	ctx,
	&fleetstatev1beta4.ListVehiclesRequest{
		FleetId:  firstFleetId,
		PageSize: &pageSize,
	},
)
if err != nil {
	log.Fatalf("could not list vehicles in fleet: %v", err)
}

log.Printf(
	"Fleet[%v] Vehicles Page 1: %v",
	firstFleetId,
	jsonMarshalOptions.Format(firstFleetVehicles),
)
const firstFleetId = fleets.fleets[0].id;

const vehiclesPage1 = await client.listVehicles({
    fleetId: firstFleetId,
    pageSize: 2,
});
console.log(JSON.stringify(vehiclesPage1, null, 4));

Request the remaining vehicles from the fleet using the nextPageToken from the previous request.

if firstFleetVehicles.NextPageToken != nil {
	pageSize := int32(100)
	firstFleetVehiclesPage2, err := fleetStateClient.ListVehicles(
		ctx,
		&fleetstatev1beta4.ListVehiclesRequest{
			FleetId:   firstFleetId,
			PageToken: firstFleetVehicles.NextPageToken,
			PageSize:  &pageSize,
		},
	)
	if err != nil {
		log.Fatalf("could not list vehicles in fleet: %v", err)
	}
	log.Printf(
		"Fleet[%v] Vehicles Page 2: %v",
		firstFleetId,
		jsonMarshalOptions.Format(firstFleetVehiclesPage2),
	)
}
if (vehiclesPage1.nextPageToken === "") {
    console.log("No more vehicles");
} else {
    const vehiclesPage2 = await client.listVehicles({
        fleetId: fleets.fleets[0].id,
        pageToken: vehiclesPage1.nextPageToken,
    });
    console.log(JSON.stringify(vehiclesPage2, null, 4));    
}

Events#

The Fleet API also offers server-side streaming endpoints to subscribe to events. Streams can be durable for reliable message delivery or non-durable if the latest message contains the full state.

An example implementation of a non-durable streaming endpoint is shown here using the SubscribeToVehicleTelemetryMovement endpoint. For a Vehicle that is currently online and able to fulfill requests, subscribe to the telemetry movement data via the streaming endpoint. Make sure to specify a timeout deadline for the streaming call and reconnect if needed.

quit := make(chan bool) // channel to close long-running streaming connections

go func() {
	// use a larger timeout for streaming endpoints
	// set the time out up to 1 hour depending on how long you want to keep the stream open
	// and reconnect upon timeout
	ctxStreamingTelemetry, cancelStreaming := context.WithTimeout(context.Background(), streamingTimeout)
	defer cancelStreaming()

	log.Printf("Waiting for vehicle movements: %v", testVehicleId)
	streamVehicleTelemetry, err := fleetStateClient.SubscribeToVehicleTelemetryMovement(ctxStreamingTelemetry, &fleetstatev1beta4.SubscribeToVehicleTelemetryMovementRequest{VehicleId: testVehicleId})
	if err != nil {
		log.Fatalf("Could not start stream vehicle movement: %v", err)
	}

	for {
		select {
		case <-quit:
			// close stream on shutdown signal
			err := streamVehicleTelemetry.CloseSend()
			if err != nil {
				log.Fatalf("Could not close stream: %v.", err)
			}
			return
		default:
			movement, err := streamVehicleTelemetry.Recv()
			if err == io.EOF || status.Code(err) == codes.DeadlineExceeded {
				log.Println("VehicleTelemetry Stream closed. Reconnecting...")

				// server or client has closed the stream, reconnect
				ctxStreamingTelemetry, cancelStreaming = context.WithTimeout(context.Background(), streamingTimeout)
				defer cancelStreaming()

				streamVehicleTelemetry, err = fleetStateClient.SubscribeToVehicleTelemetryMovement(ctxStreamingTelemetry, &fleetstatev1beta4.SubscribeToVehicleTelemetryMovementRequest{VehicleId: testVehicleId})
				if err != nil {
					log.Fatalf("Could not start stream vehicle movement: %v", err)
				}
			} else if err != nil {
				log.Fatalf("could not stream vehicle movement: %v", err)
			} else {
				handleVehicleTelemetryMessage(testVehicleId, movement)
			}
		}
	}
}()
const abortControllerStreams = new AbortController();
const abortSignal = abortControllerStreams.signal

async function streamVehicleTelemetry() {
    while(true) {
        try {
            const streamVehicleTelemetryMovement = client.subscribeToVehicleTelemetryMovement({
                vehicleId: testVehicleId,
            }, { signal: abortSignal, deadline: streamingTimeoutInMs}); 
        
            for await (const telemetryMovement of streamVehicleTelemetryMovement) {
                // handling of telemetry event, add your consumption logic here
                console.log(JSON.stringify(telemetryMovement, null, 4));
            }
    
        } catch (error: unknown) {
            if (error instanceof grpc.ClientError && error.code === grpc.Status.DEADLINE_EXCEEDED) {
                console.log("VehicleTelemetry Stream closed. Reconnecting...")
            } else if (isAbortError(error)) {
                console.log("Received aborting signal, shutdown vehicle telemetry stream.");
                return
            } else {
                console.error(error);
                throw error;
            }
        }
    }         
}
streamVehicleTelemetry();

Durable streaming endpoints provide you with the option to continue reading a stream after a disconnect. To not miss any events, it is important to save the lastMessageId and use it to continue the stream and replay missed events. In the example below you can see that we initially do not have a lastMessageId and we do not set it when calling the streaming endpoint. From that time we save the lastMessageId with every incoming message and set it when reconnecting. Note that you receive a message with a nil event and a message id on initial connection.

An example implementation of a durable streaming endpoint is shown here using the SubscribeToFleetMissionEvents endpoint. Here, we reconnect with lastMessageId once our deadline exceeded but the same approach can also be used after an unexpected downtime to replay any missed events.

go func() {
	ctxMissionEventsStreaming, cancel := context.WithTimeout(context.Background(), streamingTimeout)
	defer cancel()

	log.Printf("Waiting for vehicle mission events updates: %v", testVehicleId)
	// When first subscribing to the stream do not specify the lastMessageId
	streamMissionEvents, err := fleetStateClient.SubscribeToFleetMissionEvents(
		ctxMissionEventsStreaming,
		&fleetstatev1beta4.SubscribeToFleetMissionEventsRequest{
			FleetId: "bb860891-5198-4d1d-8f75-45efd600e710",
		},
	)
	if err != nil {
		log.Fatalf("Could not start stream mission events: %v", err)
	}

	// Receive the initial message with a null event and with a last message ID to reconnect whenever needed.
	initialMsg, err := streamMissionEvents.Recv()
	if err != nil {
		log.Fatalf("Could not receive initial message.")
	}

	lastReceivedMessageId := initialMsg.MessageId

	for {
		select {
		case <-quit:
			// close stream on shutdown signal
			err := streamMissionEvents.CloseSend()
			if err != nil {
				log.Fatalf("Could not close stream: %v.", err)
			}
			return
		default:
			msg, err := streamMissionEvents.Recv()

			if err == io.EOF || status.Code(err) == codes.DeadlineExceeded {
				// either server closed stream or timeout reached
				// reconnect with latest known message id to replay events from this point forward
				log.Println("Mission EventSream closed. Reconnecting...")
				ctxMissionEventsStreaming, _ = context.WithTimeout(context.Background(), streamingTimeout)
				streamMissionEvents, err = fleetStateClient.SubscribeToFleetMissionEvents(
					ctxMissionEventsStreaming,
					&fleetstatev1beta4.SubscribeToFleetMissionEventsRequest{
						FleetId:       firstFleetId,
						LastMessageId: &lastReceivedMessageId,
					},
				)
				if err != nil {
					log.Fatalf("Could not receive initial message: %v.", err)
				}
			} else if err != nil {
				log.Fatalf("Unexpected error occured: %v", err)
			} else {
				// custom handling of mission progress event, add your consumption logic here
				handleMissionEventMessage(testVehicleId, msg)
				// save the id of the last processed message to reconnect whenever needed
				lastReceivedMessageId = msg.MessageId
			}
		}
	}
}()
async function streamVehicleMissionEvents() {
    // When first subscribing to the stream we do not specify a lastMessageId
    let lastKnownMessageId: undefined | string = undefined;
    while(true) {
        try {
            const streamVehicleMissionEvent = client.subscribeToFleetMissionEvents({
                fleetId: firstFleetId,
                lastMessageId: lastKnownMessageId,
            }, { signal: abortSignal, deadline: streamingTimeoutInMs }); 
        
            for await (const missionEvent of streamVehicleMissionEvent) {
                // handling of mission progress event, add your consumption logic here
                console.log(JSON.stringify(missionEvent, null, 4));

	// save the id of the last processed message to reconnect whenever needed
                lastKnownMessageId = missionEvent.messageId
            }
    
        } catch (error: unknown) {
            if (error instanceof grpc.ClientError && error.code === grpc.Status.DEADLINE_EXCEEDED) {
                console.log("MissionEvent Stream closed. Reconnecting...")
            } else if (isAbortError(error)) {
                console.log("Received aborting signal, shutdown mission event stream.");
                return;
            } else {
                console.error(error);
                throw error;
            }
        }
    }             
}
streamVehicleMissionEvents()

Full Example Source Code#

package main

import (
	context "context"
	"io"
	"log"
	"net/http"
	"os"
	"time"

	"golang.org/x/oauth2"
	"golang.org/x/oauth2/clientcredentials"
	grpc "google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/credentials/oauth"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/encoding/protojson"
	"moia.io/fleetstate-client/fleetstatev1beta4"
)

const (
	streamingTimeout = 30 * time.Second
	unaryTimeout     = 5 * time.Second
)

func main() {
	// Section: Authentication
	clientId := os.Getenv("CLIENT_ID")
	clientSecret := os.Getenv("CLIENT_SECRET")
	const tokenUrl = "https://fleet-api.int.eu-central-1.moia-group.io/" +
		"auth/oauth/token"
	const testVehicleId = "819f0cda-62dd-4251-9efd-9c4294b4f42d"

	oauthConfig := clientcredentials.Config{
		ClientID:     clientId,
		ClientSecret: clientSecret,
		TokenURL:     tokenUrl,
		AuthStyle:    oauth2.AuthStyleInHeader,
	}

	httpCtx := context.Background()
	httpClient := &http.Client{Timeout: 30 * time.Second}
	httpCtx = context.WithValue(httpCtx, oauth2.HTTPClient, httpClient)

	// Used in the next steps
	accessToken, err := oauthConfig.Token(httpCtx)

	if err != nil {
		log.Fatalf("could not fetch token: %v", err)
	}
	// End Section

	// Section: Connect to the Fleet API
	perRPC := oauth.TokenSource{
		TokenSource: oauth2.StaticTokenSource(accessToken),
	}

	const apiUrl = "fleet-api.int.eu-central-1.moia-group.io:443"

	conn, err := grpc.Dial(
		apiUrl,
		grpc.WithTransportCredentials(
			credentials.NewClientTLSFromCert(nil, ""),
		),
		grpc.WithPerRPCCredentials(perRPC),
	)
	if err != nil {
		log.Fatalf("could not connect to %s: %v", apiUrl, err)
	}
	defer conn.Close()

	fleetStateClient := fleetstatev1beta4.NewFleetStateServiceClient(conn)

	ctx, cancel := context.WithTimeout(context.Background(), unaryTimeout)
	defer cancel()

	// Used for printing responses
	jsonMarshalOptions := protojson.MarshalOptions{
		Multiline:       true,
		EmitUnpopulated: true,
	}
	// End Section

	// Section: Use fleet API 1
	fleetsResponse, err := fleetStateClient.ListFleets(
		ctx,
		&fleetstatev1beta4.ListFleetsRequest{},
	)
	if err != nil {
		log.Fatalf("could not fetch fleets %v", err)
	}
	log.Printf("Fleets found: %v", jsonMarshalOptions.Format(fleetsResponse))
	// End Section

	// Section: Use fleet API 2
	firstFleetId := fleetsResponse.Fleets[0].Id

	pageSize := int32(2)
	firstFleetVehicles, err := fleetStateClient.ListVehicles(
		ctx,
		&fleetstatev1beta4.ListVehiclesRequest{
			FleetId:  firstFleetId,
			PageSize: &pageSize,
		},
	)
	if err != nil {
		log.Fatalf("could not list vehicles in fleet: %v", err)
	}

	log.Printf(
		"Fleet[%v] Vehicles Page 1: %v",
		firstFleetId,
		jsonMarshalOptions.Format(firstFleetVehicles),
	)
	// End Section

	// Section: Use fleet API 3
	if firstFleetVehicles.NextPageToken != nil {
		pageSize := int32(100)
		firstFleetVehiclesPage2, err := fleetStateClient.ListVehicles(
			ctx,
			&fleetstatev1beta4.ListVehiclesRequest{
				FleetId:   firstFleetId,
				PageToken: firstFleetVehicles.NextPageToken,
				PageSize:  &pageSize,
			},
		)
		if err != nil {
			log.Fatalf("could not list vehicles in fleet: %v", err)
		}
		log.Printf(
			"Fleet[%v] Vehicles Page 2: %v",
			firstFleetId,
			jsonMarshalOptions.Format(firstFleetVehiclesPage2),
		)
	}
	// End Section

	// Section: Use fleet API 4
	quit := make(chan bool) // channel to close long-running streaming connections

	go func() {
		// use a larger timeout for streaming endpoints
		// set the time out up to 1 hour depending on how long you want to keep the stream open
		// and reconnect upon timeout
		ctxStreamingTelemetry, cancelStreaming := context.WithTimeout(context.Background(), streamingTimeout)
		defer cancelStreaming()

		log.Printf("Waiting for vehicle movements: %v", testVehicleId)
		streamVehicleTelemetry, err := fleetStateClient.SubscribeToVehicleTelemetryMovement(ctxStreamingTelemetry, &fleetstatev1beta4.SubscribeToVehicleTelemetryMovementRequest{VehicleId: testVehicleId})
		if err != nil {
			log.Fatalf("Could not start stream vehicle movement: %v", err)
		}

		for {
			select {
			case <-quit:
				// close stream on shutdown signal
				err := streamVehicleTelemetry.CloseSend()
				if err != nil {
					log.Fatalf("Could not close stream: %v.", err)
				}
				return
			default:
				movement, err := streamVehicleTelemetry.Recv()
				if err == io.EOF || status.Code(err) == codes.DeadlineExceeded {
					log.Println("VehicleTelemetry Stream closed. Reconnecting...")

					// server or client has closed the stream, reconnect
					ctxStreamingTelemetry, cancelStreaming = context.WithTimeout(context.Background(), streamingTimeout)
					defer cancelStreaming()

					streamVehicleTelemetry, err = fleetStateClient.SubscribeToVehicleTelemetryMovement(ctxStreamingTelemetry, &fleetstatev1beta4.SubscribeToVehicleTelemetryMovementRequest{VehicleId: testVehicleId})
					if err != nil {
						log.Fatalf("Could not start stream vehicle movement: %v", err)
					}
				} else if err != nil {
					log.Fatalf("could not stream vehicle movement: %v", err)
				} else {
					handleVehicleTelemetryMessage(testVehicleId, movement)
				}
			}
		}
	}()
	// End Section

	// Section: Use fleet API 5

	go func() {
		ctxMissionEventsStreaming, cancel := context.WithTimeout(context.Background(), streamingTimeout)
		defer cancel()

		log.Printf("Waiting for vehicle mission events updates: %v", testVehicleId)
		// When first subscribing to the stream do not specify the lastMessageId
		streamMissionEvents, err := fleetStateClient.SubscribeToFleetMissionEvents(
			ctxMissionEventsStreaming,
			&fleetstatev1beta4.SubscribeToFleetMissionEventsRequest{
				FleetId: "bb860891-5198-4d1d-8f75-45efd600e710",
			},
		)
		if err != nil {
			log.Fatalf("Could not start stream mission events: %v", err)
		}

		// Receive the initial message with a null event and with a last message ID to reconnect whenever needed.
		initialMsg, err := streamMissionEvents.Recv()
		if err != nil {
			log.Fatalf("Could not receive initial message.")
		}

		lastReceivedMessageId := initialMsg.MessageId

		for {
			select {
			case <-quit:
				// close stream on shutdown signal
				err := streamMissionEvents.CloseSend()
				if err != nil {
					log.Fatalf("Could not close stream: %v.", err)
				}
				return
			default:
				msg, err := streamMissionEvents.Recv()

				if err == io.EOF || status.Code(err) == codes.DeadlineExceeded {
					// either server closed stream or timeout reached
					// reconnect with latest known message id to replay events from this point forward
					log.Println("Mission EventSream closed. Reconnecting...")
					ctxMissionEventsStreaming, _ = context.WithTimeout(context.Background(), streamingTimeout)
					streamMissionEvents, err = fleetStateClient.SubscribeToFleetMissionEvents(
						ctxMissionEventsStreaming,
						&fleetstatev1beta4.SubscribeToFleetMissionEventsRequest{
							FleetId:       firstFleetId,
							LastMessageId: &lastReceivedMessageId,
						},
					)
					if err != nil {
						log.Fatalf("Could not receive initial message: %v.", err)
					}
				} else if err != nil {
					log.Fatalf("Unexpected error occured: %v", err)
				} else {
					// custom handling of mission progress event, add your consumption logic here
					handleMissionEventMessage(testVehicleId, msg)
					// save the id of the last processed message to reconnect whenever needed
					lastReceivedMessageId = msg.MessageId
				}
			}
		}
	}()
	// End Section

	time.Sleep(2 * time.Minute)
	// shutdown client
	log.Println("Shutdown ")
	quit <- true
}

func handleMissionEventMessage(vehicleId string, msg *fleetstatev1beta4.SubscribeToFleetMissionEventsResponse) {
	log.Printf("Vehicle[%v] Mission Event: %v", vehicleId, protojson.MarshalOptions{Multiline: true, EmitUnpopulated: true}.Format(msg))
}

func handleVehicleTelemetryMessage(vehicleId string, msg *fleetstatev1beta4.SubscribeToVehicleTelemetryMovementResponse) {
	log.Printf("Vehicle[%v] Movement: %v", vehicleId, protojson.MarshalOptions{Multiline: true, EmitUnpopulated: true}.Format(msg))
}
import * as grpc from "nice-grpc";
import { FleetStateServiceDefinition } from "./fleet_state";
import {isAbortError} from 'abort-controller-x';
import {deadlineMiddleware} from 'nice-grpc-client-middleware-deadline';
import * as http2 from "http2";

const streamingTimeoutInMs = 30 * 1000 // 30s 

async function main() {
    // Section: Authentication
    const clientId = process.env.CLIENT_ID;
    const clientSecret = process.env.CLIENT_SECRET;
    const tokenUrl =
        "https://fleet-api.int.eu-central-1.moia-group.io/" +
        "auth/oauth/token";
    const testVehicleId = "819f0cda-62dd-4251-9efd-9c4294b4f42d";

    const authHeader = Buffer.from(`${clientId}:${clientSecret}`).toString(
        "base64"
    );

    const http2AuthClient = http2.connect(tokenUrl);

    const tokenRequest = http2AuthClient.request({
        ":method": "POST",
        ":path": "/auth/oauth/token",
        "authorization": `Basic ${authHeader}`,
        "content-type": "application/x-www-form-urlencoded",
    });

    tokenRequest.setEncoding("utf8");

    const requestBody = "grant_type=client_credentials";

    tokenRequest.write(requestBody);
    tokenRequest.end();

    let tokenResponse = "";

    for await (const data of tokenRequest) {
        tokenResponse += data;
    }

    const parsedTokenResponse = JSON.parse(tokenResponse) as Record<string, any>;

    if (parsedTokenResponse.error) {
        console.error(parsedTokenResponse.error_description);
        return;
    }

    http2AuthClient.close();

    // Used in the next steps
    const accessToken = parsedTokenResponse.access_token;
    // End Section

    // Section: Connect to the Fleet API
    const apiUrl = "fleet-api.int.eu-central-1.moia-group.io:443";

    const clientFactory = grpc.createClientFactory().use(deadlineMiddleware);

    const channel = grpc.createChannel(
        apiUrl,
        grpc.ChannelCredentials.createSsl()
    );

    const client = clientFactory.create(FleetStateServiceDefinition, channel, {
        "*": {
            metadata: new grpc.Metadata({
                Authorization: `Bearer ${accessToken}`,
            }),
        },
    });
    // End Section

    // Section: Use fleet API 1
    const fleets = await client.listFleets({});
    console.log(JSON.stringify(fleets, null, 4));
    // End Section

    // Section: Use fleet API 2
    const firstFleetId = fleets.fleets[0].id;

    const vehiclesPage1 = await client.listVehicles({
        fleetId: firstFleetId,
        pageSize: 2,
    });
    console.log(JSON.stringify(vehiclesPage1, null, 4));
    // End Section

    // Section: Use fleet API 3
    if (vehiclesPage1.nextPageToken === "") {
        console.log("No more vehicles");
    } else {
        const vehiclesPage2 = await client.listVehicles({
            fleetId: fleets.fleets[0].id,
            pageToken: vehiclesPage1.nextPageToken,
        });
        console.log(JSON.stringify(vehiclesPage2, null, 4));    
    }
    // End Section

    // Section: Use fleet API 4 
    const abortControllerStreams = new AbortController();
    const abortSignal = abortControllerStreams.signal

    async function streamVehicleTelemetry() {
        while(true) {
            try {
                const streamVehicleTelemetryMovement = client.subscribeToVehicleTelemetryMovement({
                    vehicleId: testVehicleId,
                }, { signal: abortSignal, deadline: streamingTimeoutInMs}); 
            
                for await (const telemetryMovement of streamVehicleTelemetryMovement) {
                    // handling of telemetry event, add your consumption logic here
                    console.log(JSON.stringify(telemetryMovement, null, 4));
                }
        
            } catch (error: unknown) {
                if (error instanceof grpc.ClientError && error.code === grpc.Status.DEADLINE_EXCEEDED) {
                    console.log("VehicleTelemetry Stream closed. Reconnecting...")
                } else if (isAbortError(error)) {
                    console.log("Received aborting signal, shutdown vehicle telemetry stream.");
                    return
                } else {
                    console.error(error);
                    throw error;
                }
            }
        }         
    }
    streamVehicleTelemetry();
    // End Section

    // Section: Use fleet API 5

    async function streamVehicleMissionEvents() {
        // When first subscribing to the stream we do not specify a lastMessageId
        let lastKnownMessageId: undefined | string = undefined;
        while(true) {
            try {
                const streamVehicleMissionEvent = client.subscribeToFleetMissionEvents({
                    fleetId: firstFleetId,
                    lastMessageId: lastKnownMessageId,
                }, { signal: abortSignal, deadline: streamingTimeoutInMs }); 
            
                for await (const missionEvent of streamVehicleMissionEvent) {
                    // handling of mission progress event, add your consumption logic here
                    console.log(JSON.stringify(missionEvent, null, 4));

					// save the id of the last processed message to reconnect whenever needed
                    lastKnownMessageId = missionEvent.messageId
                }
        
            } catch (error: unknown) {
                if (error instanceof grpc.ClientError && error.code === grpc.Status.DEADLINE_EXCEEDED) {
                    console.log("MissionEvent Stream closed. Reconnecting...")
                } else if (isAbortError(error)) {
                    console.log("Received aborting signal, shutdown mission event stream.");
                    return;
                } else {
                    console.error(error);
                    throw error;
                }
            }
        }             
    }
    streamVehicleMissionEvents()
    // End Section

    await new Promise(f => setTimeout(f, 2*60*1000)); // sleep for 3 minutes 
    console.log('Shutdown client...')
    abortControllerStreams.abort()
    channel.close()
}

main()