Manifest Service

Code Review

1). Provider Service Calls/Initiates the Manifest Service

Source code reference location

manifest, err := manifest.NewService(ctx, session, bus, cluster.HostnameService(), manifestConfig)
if err != nil {
session.Log().Error("creating manifest handler", "err", err)
cancel()
<-cluster.Done()
<-bidengine.Done()
<-bc.lc.Done()
return nil, err
}

2). Manifest Calls/Initiates an Event Bus to Monitor Lease Won Events

The NewService function called from provider/manifest/service.go subscribes to a RPC node event bus for new lease won processing.

Eventually the run method in this package is called with a service type passed in.

Source code reference location

func NewService(ctx context.Context, session session.Session, bus pubsub.Bus, hostnameService clustertypes.HostnameServiceClient, cfg ServiceConfig) (Service, error) {
session = session.ForModule("provider-manifest")
sub, err := bus.Subscribe()
if err != nil {
return nil, err
}
s := &service{
session: session,
bus: bus,
sub: sub,
statusch: make(chan chan<- *Status),
mreqch: make(chan manifestRequest),
activeCheckCh: make(chan isActiveCheck),
managers: make(map[string]*manager),
managerch: make(chan *manager),
lc: lifecycle.New(),
hostnameService: hostnameService,
config: cfg,
watchdogch: make(chan dtypes.DeploymentID),
watchdogs: make(map[dtypes.DeploymentID]*watchdog),
}
go s.lc.WatchContext(ctx)
go s.run()
return s, nil
}

3). Monitor Service Loop is Created to React to New Lease Won Events

Within the run function of provider/manifest/service.go an endless for loop monitors for events placed onto a channel. When a event is received for the RPC node event bus of type LeaseWon the handleLease method is called.

for {
select {
case err := <-s.lc.ShutdownRequest():
s.lc.ShutdownInitiated(err)
break loop
case ev := <-s.sub.Events():
switch ev := ev.(type) {
case event.LeaseWon:
if ev.LeaseID.GetProvider() != s.session.Provider().Address().String() {
continue
}
s.session.Log().Info("lease won", "lease", ev.LeaseID)
s.handleLease(ev, true)

The handleLease method determines if a manager is active for the deployment via the ensureManager method. The manifest manager logic exists in provider/manifest/manager.go and handles the validation/application of the manifest when received from the tenant send manifest operation.

func (s *service) handleLease(ev event.LeaseWon, isNew bool) {
// Only run this if configured to do so
if isNew && s.config.ManifestTimeout > time.Duration(0) {
// Create watchdog if it does not exist AND a manifest has not been received yet
if watchdog := s.watchdogs[ev.LeaseID.DeploymentID()]; watchdog == nil {
watchdog = newWatchdog(s.session, s.lc.ShuttingDown(), s.watchdogch, ev.LeaseID, s.config.ManifestTimeout)
s.watchdogs[ev.LeaseID.DeploymentID()] = watchdog
}
}
manager := s.ensureManager(ev.LeaseID.DeploymentID())
....
}

New Manifest Manager instance is initiated by calling the newManager function in provider/manifest/manager.go with the service type and deployment ID (DSEQ) passed in as arguments.

func (s *service) ensureManager(did dtypes.DeploymentID) (manager *manager) {
manager = s.managers[dquery.DeploymentPath(did)]
if manager == nil {
manager = newManager(s, did)
s.managers[dquery.DeploymentPath(did)] = manager
}
return manager
}

4). Manifest Manager Logic

Source code reference location

The newManager function calls the run method - passing in the manager struct that includes the leasech channel - which invokes a perpetual for loop to await events on various channels.

The manifest manager instance is returned to the handleLease method in service.go.

func newManager(h *service, daddr dtypes.DeploymentID) *manager {
session := h.session.ForModule("manifest-manager")
...
go m.lc.WatchChannel(h.lc.ShuttingDown())
go m.run(h.managerch)
return m
}

The handleLease method in service.go continues and calls another handleLease method in manager.go passing in lease events received on the bus.

func (s *service) handleLease(ev event.LeaseWon, isNew bool) {
....
manager := s.ensureManager(ev.LeaseID.DeploymentID())
manager.handleLease(ev)
}

The handleLease method in manager.go puts the event onto the leasech channel.

func (m *manager) handleLease(ev event.LeaseWon) {
select {
case m.leasech <- ev:
case <-m.lc.ShuttingDown():
m.log.Error("not running: handle manifest", "lease", ev.LeaseID)
}
}

When the manifest manager run method receives an event on the leasech channel the maybeFetchData method is called and results is placed onto the runch channel.

func (m *manager) run(donech chan<- *manager) {
..
loop:
for {
....
select {
....
case ev := <-m.leasech:
m.log.Info("new lease", "lease", ev.LeaseID)
m.clearFetched()
m.maybeScheduleStop()
runch = m.maybeFetchData(ctx, runch)

The maybeFetchData method attempts to fetch deployment and lease data with associated downstream logic.

func (m *manager) maybeFetchData(ctx context.Context, runch <-chan runner.Result) <-chan runner.Result {
if runch != nil {
return runch
}
if !m.fetched || time.Since(m.fetchedAt) > m.config.CachedResultMaxAge {
m.clearFetched()
return m.fetchData(ctx)
}
return runch
}

5). Receipt of Manifest from Tenant Send to Provider

A method of name Submit is included in provider/manifest/service.go which accepts incoming manifest sends from the deployer/tenant to the provider. The function is initiated via an incoming HTTP post detailed subsequently.

func (s *service) Submit(ctx context.Context, did dtypes.DeploymentID, mani manifest.Manifest) error {
....
select {
case <-ctx.Done():
return ctx.Err()
case s.mreqch <- req:
case <-s.lc.ShuttingDown():
return ErrNotRunning
case <-s.lc.Done():
return ErrNotRunning
}
...
}

The Submit method is called when a HTTP post - which contains the Akash manifest in the body - is received on an endpoint and handler written/registered in provider/gateway/rest/router.go.

HTTP Endpoint

drouter.HandleFunc("/manifest",
createManifestHandler(log, pclient.Manifest())).
Methods(http.MethodPut)
lrouter := router.PathPrefix(leasePathPrefix).Subrouter()
lrouter.Use(
requireOwner(),
requireLeaseID(),
)

Request Handler

Note the call of the Submit method which is the provider/manifest/service.go function shown prior. The Deployment ID and manifest are sent to Submit as received in the HTTP post from the tenant’s send manifest action following lease creation with a provider.

func createManifestHandler(log log.Logger, mclient pmanifest.Client) http.HandlerFunc {
....
if err := mclient.Submit(subctx, requestDeploymentID(req), mani); err != nil {
if errors.Is(err, manifestValidation.ErrInvalidManifest) {
http.Error(w, err.Error(), http.StatusUnprocessableEntity)
return
....
}
}
footer-logo-dark

© Akash Network 2024 The Akash Network Authors Documentation Distributed under CC BY 4.0

Open-source Apache 2.0 Licensed.

GitHub v0.20.0

Privacy