Skip to content
On this page

Events

goes defines and implements a generic event system that is used as the building block for aggregates, commands, and projections.

Event System

The event system consists of 3 interfaces – event.Event / event.Of[T], event.Bus, and event.Store. All other components of goes are built on top of this system.

View type definitions

Kinds of Events

An event can be either be a "normal" event, or an Aggregate Event, differing only in the data they provide. Normal events provide a unique id, the event name, event time, and some arbitrary Event Data. Aggregate Events additionally provide the position of the event within the event stream of an aggregate.

Backends

goes provides backend implementations for the event.Bus and event.Store interfaces:

Event Bus

Event Store

Type Definitions

Event

go
// Event is an event with arbitrary data.
type Event = Of[any]

// Of is an event with the given specific data type. An event has a unique id,
// a name, user-provided event data, and the time at which the event was raised.
//
// If the Aggregate method of an event returns non-zero values, the event is
// considered to belong to the event stream of that aggregate:
//
//	var evt event.Event
//	id, name, version := evt.Aggregate()
//	// id is the UUID of the aggregate that the event belongs to
//	// name is the name of the aggregate that the event belongs to
//	// version is the optimistic concurrency version of the event within the
//	// event stream of the aggregate
//
// If an event is not part of an aggregate, the Aggregate method should return
// only zero values.
//
// Use the New function to create an event:
//
//	evt := event.New("foo", 3)
//	// evt.Name() == "foo"
//	// evt.Data() == 3
//	// evt.Time() == time.Now()
//
// To create an event for an aggregate, use the Aggregate() option:
//
//	var aggregateID uuid.UUID
//	var aggregateName string
//	var aggregateVersion int
//	evt := event.New("foo", 3, event.Aggregate(aggregateID, aggregateName, aggregateVersion))
type Of[Data any] interface {
	// ID returns the id of the event.
	ID() uuid.UUID
	// Name returns the name of the event.
	Name() string
	// Time returns the time of the event.
	Time() time.Time
	// Data returns the event data.
	Data() Data

	// Aggregate returns the id, name and version of the aggregate that the
	// event belongs to. aggregate should return zero values if the event is not
	// an aggregate event.
	Aggregate() (id uuid.UUID, name string, version int)
}

Bus

go
// Bus is the pub-sub client for events.
type Bus interface {
	Publisher
	Subscriber
}

// A Publisher allows to publish events to subscribers of these events.
type Publisher interface {
	// Publish publishes events. Each event is sent to all subscribers of the event.
	Publish(ctx context.Context, events ...Event) error
}

// A Subscriber allows to subscribe to events.
type Subscriber interface {
	// Subscribe subscribes to events with the given names and returns two
	// channels – one for the received events and one for any asynchronous
	// errors that occur during the subscription. If Subscribe fails to
	// subscribe to all events, nil channels and an error are returned instead.
	//
	// When the provided context is canceled, the subscription is also canceled
	// and the returned channels are closed.
	Subscribe(ctx context.Context, names ...string) (<-chan Event, <-chan error, error)
}

Store

go
// A Store provides persistence for events.
type Store interface {
	// Insert inserts events into the store.
	Insert(context.Context, ...Event) error

	// Find fetches the given event from the store.
	Find(context.Context, uuid.UUID) (Event, error)

	// Query queries the store for events and returns two channels – one for the
	// returned events and one for any asynchronous errors that occur during the
	// query.
	//
	//	var store event.Store
	//	events, errs, err := store.Query(context.TODO(), query.New(...))
	//	// handle err
	//	err := streams.Walk(context.TODO(), func(evt event.Event) {
	//		log.Println(fmt.Sprintf("Queried event: %s", evt.Name()))
	//	}, events, errs)
	//	// handle err
	Query(context.Context, Query) (<-chan Event, <-chan error, error)

	// Delete deletes events from the store.
	Delete(context.Context, ...Event) error
}