diff --git a/accounts/rpcserver.go b/accounts/rpcserver.go index ce1291c17..36dd74430 100644 --- a/accounts/rpcserver.go +++ b/accounts/rpcserver.go @@ -3,6 +3,7 @@ package accounts import ( "context" "encoding/hex" + "errors" "fmt" "time" @@ -16,6 +17,13 @@ import ( "gopkg.in/macaroon.v2" ) +var ( + // ErrServerNotActive indicates that the server has started but hasn't + // fully finished the startup process. + ErrServerNotActive = errors.New("accounts server is still in the " + + "process of starting") +) + // RPCServer is the main server that implements the Accounts gRPC service. type RPCServer struct { litrpc.UnimplementedAccountsServer @@ -26,13 +34,17 @@ type RPCServer struct { } // NewRPCServer returns a new RPC server for the given service. -func NewRPCServer(service *InterceptorService, - superMacBaker litmac.Baker) *RPCServer { +func NewRPCServer() *RPCServer { + return &RPCServer{} +} - return &RPCServer{ - service: service, - superMacBaker: superMacBaker, - } +// Start adds the necessary dependencies for the RPCServer to be able to process +// requests, and starts the RPCServer. +func (s *RPCServer) Start(service *InterceptorService, + superMacBaker litmac.Baker) { + + s.service = service + s.superMacBaker = superMacBaker } // CreateAccount adds an entry to the account database. This entry represents diff --git a/session_rpcserver.go b/session_rpcserver.go index cb57b847b..b88cea053 100644 --- a/session_rpcserver.go +++ b/session_rpcserver.go @@ -40,6 +40,13 @@ import ( // other special cases. const readOnlyAction = "***readonly***" +var ( + // ErrServerNotActive indicates that the server has started but hasn't + // fully finished the startup process. + ErrServerNotActive = errors.New("session server is still in the " + + "process of starting") +) + // sessionRpcServer is the gRPC server for the Session RPC interface. type sessionRpcServer struct { litrpc.UnimplementedSessionsServer @@ -70,42 +77,11 @@ type sessionRpcServerConfig struct { privMap firewalldb.PrivacyMapper } -// newSessionRPCServer creates a new sessionRpcServer using the passed config. -func newSessionRPCServer(cfg *sessionRpcServerConfig) (*sessionRpcServer, - error) { - - // Create the gRPC server that handles adding/removing sessions and the - // actual mailbox server that spins up the Terminal Connect server - // interface. - server := session.NewServer( - func(id session.ID, opts ...grpc.ServerOption) *grpc.Server { - // Add the session ID injector interceptors first so - // that the session ID is available in the context of - // all interceptors that come after. - allOpts := []grpc.ServerOption{ - addSessionIDToStreamCtx(id), - addSessionIDToUnaryCtx(id), - } - - allOpts = append(allOpts, cfg.grpcOptions...) - allOpts = append(allOpts, opts...) - - // Construct the gRPC server with the options. - grpcServer := grpc.NewServer(allOpts...) - - // Register various grpc servers with the LNC session - // server. - cfg.registerGrpcServers(grpcServer) - - return grpcServer - }, - ) - +// newSessionRPCServer creates a new sessionRpcServer. +func newSessionRPCServer() *sessionRpcServer { return &sessionRpcServer{ - cfg: cfg, - sessionServer: server, - quit: make(chan struct{}), - }, nil + quit: make(chan struct{}), + } } // wrappedServerStream is a wrapper around the grpc.ServerStream that allows us @@ -164,9 +140,42 @@ func addSessionIDToUnaryCtx(id session.ID) grpc.ServerOption { }) } -// start all the components necessary for the sessionRpcServer to start serving -// requests. This includes resuming all non-revoked sessions. -func (s *sessionRpcServer) start(ctx context.Context) error { +// start starts a new sessionRpcServer using the passed config, and adds all +// components necessary for the sessionRpcServer to start serving requests. This +// includes resuming all non-revoked sessions. +func (s *sessionRpcServer) start(ctx context.Context, + cfg *sessionRpcServerConfig) error { + + // Create the gRPC server that handles adding/removing sessions and the + // actual mailbox server that spins up the Terminal Connect server + // interface. + server := session.NewServer( + func(id session.ID, opts ...grpc.ServerOption) *grpc.Server { + // Add the session ID injector interceptors first so + // that the session ID is available in the context of + // all interceptors that come after. + allOpts := []grpc.ServerOption{ + addSessionIDToStreamCtx(id), + addSessionIDToUnaryCtx(id), + } + + allOpts = append(allOpts, cfg.grpcOptions...) + allOpts = append(allOpts, opts...) + + // Construct the gRPC server with the options. + grpcServer := grpc.NewServer(allOpts...) + + // Register various grpc servers with the LNC session + // server. + cfg.registerGrpcServers(grpcServer) + + return grpcServer + }, + ) + + s.cfg = cfg + s.sessionServer = server + // Delete all sessions in the Reserved state. err := s.cfg.db.DeleteReservedSessions(ctx) if err != nil { @@ -255,7 +264,9 @@ func (s *sessionRpcServer) start(ctx context.Context) error { func (s *sessionRpcServer) stop() error { var returnErr error s.stopOnce.Do(func() { - s.sessionServer.Stop() + if s.sessionServer != nil { + s.sessionServer.Stop() + } close(s.quit) s.wg.Wait() diff --git a/terminal.go b/terminal.go index 7e4d552c7..4194eae1e 100644 --- a/terminal.go +++ b/terminal.go @@ -447,33 +447,15 @@ func (g *LightningTerminal) start(ctx context.Context) error { return fmt.Errorf("could not create network directory: %v", err) } - g.stores, err = NewStores(g.cfg, clock.NewDefaultClock()) - if err != nil { - return fmt.Errorf("could not create stores: %v", err) - } - - if err := g.stores.firewall.Start(ctx); err != nil { - return fmt.Errorf("could not start firewall DB: %v", err) - } - - g.accountService, err = accounts.NewService( - g.stores.accounts, accountServiceErrCallback, - ) - if err != nil { - return fmt.Errorf("error creating account service: %v", err) - } - - superMacBaker := func(ctx context.Context, rootKeyID uint64, - perms []bakery.Op, caveats []macaroon.Caveat) (string, error) { - - return litmac.BakeSuperMacaroon( - ctx, g.basicClient, rootKeyID, perms, caveats, - ) - } - - g.accountRpcServer = accounts.NewRPCServer( - g.accountService, superMacBaker, - ) + // We create a reference to the `accountRpcServer` here before starting + // it and prior to setting up the LND connection. This is because when + // the LND connection is set up for an integrated LND instance, LND will + // call litd's `RegisterGrpcSubserver` function during the setup of the + // connection. + // That function calls `registerSubDaemonGrpcServers` which requires + // that the `accountRpcServer` pointer exist, to not nil pointer panic + // when requests get passed to the server. + g.accountRpcServer = accounts.NewRPCServer() g.ruleMgrs = rules.NewRuleManagerSet() @@ -506,35 +488,11 @@ func (g *LightningTerminal) start(ctx context.Context) error { } } - g.sessionRpcServer, err = newSessionRPCServer(&sessionRpcServerConfig{ - db: g.stores.sessions, - basicAuth: g.rpcProxy.basicAuth, - grpcOptions: []grpc.ServerOption{ - grpc.CustomCodec(grpcProxy.Codec()), // nolint: staticcheck, - grpc.ChainStreamInterceptor( - g.rpcProxy.StreamServerInterceptor, - ), - grpc.ChainUnaryInterceptor( - g.rpcProxy.UnaryServerInterceptor, - ), - grpc.UnknownServiceHandler( - grpcProxy.TransparentHandler( - // Don't allow calls to litrpc. - g.rpcProxy.makeDirector(false), - ), - ), - }, - registerGrpcServers: func(server *grpc.Server) { - g.registerSubDaemonGrpcServers(server, true) - }, - superMacBaker: superMacBaker, - firstConnectionDeadline: g.cfg.FirstLNCConnDeadline, - permMgr: g.permsMgr, - actionsDB: g.stores.firewall, - autopilot: g.autopilotClient, - ruleMgrs: g.ruleMgrs, - privMap: g.stores.firewall, - }) + // Similar to the init of the `accountRpcServer` reference above, we + // create a reference to the `sessionRpcServer` here before setting up + // the LND connection. See the comment above for the `accountRpcServer` + // to understand why this is necessary. + g.sessionRpcServer, err = newSessionRPCServer() if err != nil { return fmt.Errorf("could not create new session rpc "+ "server: %v", err) @@ -597,6 +555,9 @@ func (g *LightningTerminal) start(ctx context.Context) error { go func() { defer g.wg.Done() + // Note that LND will call litd's RegisterGrpcSubserver + // function during the execution of this, as `g` is + // referenced in the passed `implCfg` err := lnd.Main(g.cfg.Lnd, lisCfg, implCfg, interceptor) if e, ok := err.(*flags.Error); err != nil && (!ok || e.Type != flags.ErrHelp) { @@ -759,11 +720,49 @@ func (g *LightningTerminal) start(ctx context.Context) error { } } - // Set up all the LND clients required by LiT. - err = g.setUpLNDClients(ctx, lndQuit) + // Since we are now connected to LND, we can now set up a basic LND + // client. Note this doesn't require LND to be synced, but can still be + // used to fetch info from LND such as its macaroons. Therefore, it's ok + // set it up prior to setting up the stores and starting the other RPC + // servers, as the setup will be fast. + err = g.setupBasicLNDClient(ctx, lndQuit) + if err != nil { + g.statusMgr.SetErrored( + subservers.LND, + "could not to set up a basic LND client: %v", err, + ) + + return fmt.Errorf("could not start LND") + } + + g.stores, err = NewStores(g.cfg, clock.NewDefaultClock()) + if err != nil { + return fmt.Errorf("could not create stores: %v", err) + } + + if err := g.stores.firewall.Start(ctx); err != nil { + return fmt.Errorf("could not start firewall DB: %v", err) + } + + g.accountService, err = accounts.NewService( + g.stores.accounts, accountServiceErrCallback, + ) + if err != nil { + return fmt.Errorf("error creating account service: %v", err) + } + + if err != nil { + return fmt.Errorf("could not create new session rpc "+ + "server: %v", err) + } + + // Set up a full LND client. With this, we now have all LND clients + // needed for LiT to be fully started. + err = g.setupFullLNDClient(ctx, lndQuit) if err != nil { g.statusMgr.SetErrored( - subservers.LND, "could not set up LND clients: %v", err, + subservers.LND, + "could not to set up a full LND client: %v", err, ) return fmt.Errorf("could not start LND") @@ -819,13 +818,35 @@ func (g *LightningTerminal) basicLNDClient() (lnrpc.LightningClient, error) { return g.basicClient, nil } -// setUpLNDClients sets up the various LND clients required by LiT. -func (g *LightningTerminal) setUpLNDClients(ctx context.Context, +// checkRunning checks if we should continue running for the duration of the +// defaultStartupTimeout, or else returns an error indicating why a shut-down is +// needed. +func (g *LightningTerminal) checkRunning(ctx context.Context, + lndQuit chan struct{}) error { + + select { + case err := <-g.errQueue.ChanOut(): + return fmt.Errorf("error from subsystem: %v", err) + + case <-lndQuit: + return fmt.Errorf("LND has stopped") + + case <-ctx.Done(): + return ctx.Err() + + case <-time.After(g.cfg.LndConnectInterval): + return nil + } +} + +// setupBasicLNDClient sets up a basic LND client that can be used to connect to +// LND without requiring LND to be fully synced. Since this client is only a +// basic client, not all of LNDs functionality is available through it. +func (g *LightningTerminal) setupBasicLNDClient(ctx context.Context, lndQuit chan struct{}) error { var ( err error - insecure bool clientOptions []lndclient.BasicClientOption ) @@ -840,36 +861,13 @@ func (g *LightningTerminal) setUpLNDClients(ctx context.Context, // If we're in integrated mode, we can retrieve the macaroon string // from lnd directly, rather than grabbing it from disk. if g.cfg.LndMode == ModeIntegrated { - // Set to true in integrated mode, since we will not require tls - // when communicating with lnd via a bufconn. - insecure = true clientOptions = append(clientOptions, lndclient.Insecure()) } - // checkRunning checks if we should continue running for the duration of - // the defaultStartupTimeout, or else returns an error indicating why - // a shut-down is needed. - checkRunning := func() error { - select { - case err := <-g.errQueue.ChanOut(): - return fmt.Errorf("error from subsystem: %v", err) - - case <-lndQuit: - return fmt.Errorf("LND has stopped") - - case <-ctx.Done(): - return ctx.Err() - - case <-time.After(g.cfg.LndConnectInterval): - return nil - } - } - // The main RPC listener of lnd might need some time to start, it could // be that we run into a connection refused a few times. We use the // basic client connection to find out if the RPC server is started yet - // because that doesn't do anything else than just connect. We'll check - // if lnd is also ready to be used in the next step. + // because that doesn't do anything else than just connect. log.Infof("Connecting basic lnd client") for { @@ -891,7 +889,7 @@ func (g *LightningTerminal) setUpLNDClients(ctx context.Context, "Error when setting up basic LND Client: %v", err, ) - err = checkRunning() + err = g.checkRunning(ctx, lndQuit) if err != nil { return err } @@ -914,12 +912,34 @@ func (g *LightningTerminal) setUpLNDClients(ctx context.Context, g.cfg.statelessInitMode = macService.StatelessInit } - // Now we know that the connection itself is ready. But we also need to - // wait for two things: The chain notifier to be ready and the lnd - // wallet being fully synced to its chain backend. The chain notifier - // will always be ready first so if we instruct the lndclient to wait - // for the wallet sync, we should be fully ready to start all our - // subservers. This will just block until lnd signals readiness. + return nil +} + +// setupFullLNDClient connects a up a full LND client to LND. Note that the +// setup of this client will block until LND is fully synced and unlocked. +func (g *LightningTerminal) setupFullLNDClient(ctx context.Context, + lndQuit chan struct{}) error { + + var ( + err error + insecure bool + ) + + host, network, tlsPath, macPath, macData := g.cfg.lndConnectParams() + + if g.cfg.LndMode == ModeIntegrated { + // Ssince we will not require tls when communicating with lnd + // via a bufconn in integrated mode, we set the insecure flag + // to true. + insecure = true + } + + // When setting up a full LND client, we we need to wait for two things: + // The chain notifier to be ready and the lnd wallet being fully synced + // to its chain backend. The chain notifier will always be ready first + // so if we instruct the lndclient to wait for the wallet sync, we + // should be fully ready to start all our subservers. This will just + // block until lnd signals readiness. log.Infof("Connecting full lnd client") for { g.lndClient, err = lndclient.NewLndServices( @@ -952,7 +972,7 @@ func (g *LightningTerminal) setUpLNDClients(ctx context.Context, err, ) - err = checkRunning() + err = g.checkRunning(ctx, lndQuit) if err != nil { return err } @@ -1025,6 +1045,21 @@ func (g *LightningTerminal) startInternalSubServers(ctx context.Context, } g.macaroonServiceStarted = true + superMacBaker := func(ctx context.Context, rootKeyID uint64, + perms []bakery.Op, caveats []macaroon.Caveat) (string, error) { + + return litmac.BakeSuperMacaroon( + ctx, g.basicClient, rootKeyID, perms, caveats, + ) + } + + log.Infof("Starting LiT accounts server") + + err = g.accountRpcServer.Start(g.accountService, superMacBaker) + if err != nil { + return err + } + if !g.cfg.Autopilot.Disable { withLndVersion := func(cfg *autopilotserver.Config) { cfg.LndVersion = autopilotserver.Version{ @@ -1042,7 +1077,38 @@ func (g *LightningTerminal) startInternalSubServers(ctx context.Context, } log.Infof("Starting LiT session server") - if err = g.sessionRpcServer.start(ctx); err != nil { + + sessionCfg := &sessionRpcServerConfig{ + db: g.stores.sessions, + basicAuth: g.rpcProxy.basicAuth, + grpcOptions: []grpc.ServerOption{ + grpc.CustomCodec(grpcProxy.Codec()), // nolint: staticcheck, + grpc.ChainStreamInterceptor( + g.rpcProxy.StreamServerInterceptor, + ), + grpc.ChainUnaryInterceptor( + g.rpcProxy.UnaryServerInterceptor, + ), + grpc.UnknownServiceHandler( + grpcProxy.TransparentHandler( + // Don't allow calls to litrpc. + g.rpcProxy.makeDirector(false), + ), + ), + }, + registerGrpcServers: func(server *grpc.Server) { + g.registerSubDaemonGrpcServers(server, true) + }, + superMacBaker: superMacBaker, + firstConnectionDeadline: g.cfg.FirstLNCConnDeadline, + permMgr: g.permsMgr, + actionsDB: g.stores.firewall, + autopilot: g.autopilotClient, + ruleMgrs: g.ruleMgrs, + privMap: g.stores.firewall, + } + + if err = g.sessionRpcServer.start(ctx, sessionCfg); err != nil { return err } g.sessionRpcServerStarted = true