Skip to content

Commit 33824a2

Browse files
terminal: separate session RPC server init & start
Similar to how the previous commit separated the initialization and starting of the account RPC server, this commit separates the initialization and starting of the sessions RPC server.
1 parent 06cd73f commit 33824a2

File tree

2 files changed

+131
-76
lines changed

2 files changed

+131
-76
lines changed

session_rpcserver.go

Lines changed: 98 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"strings"
1010
"sync"
11+
"sync/atomic"
1112
"time"
1213

1314
"github.com/btcsuite/btcd/btcec/v2"
@@ -40,8 +41,17 @@ import (
4041
// other special cases.
4142
const readOnlyAction = "***readonly***"
4243

44+
var (
45+
// ErrServerNotActive indicates that the server has started but hasn't
46+
// fully finished the startup process.
47+
ErrServerNotActive = errors.New("session server is still in the " +
48+
"process of starting")
49+
)
50+
4351
// sessionRpcServer is the gRPC server for the Session RPC interface.
4452
type sessionRpcServer struct {
53+
active atomic.Bool
54+
4555
litrpc.UnimplementedSessionsServer
4656
litrpc.UnimplementedFirewallServer
4757
litrpc.UnimplementedAutopilotServer
@@ -70,41 +80,10 @@ type sessionRpcServerConfig struct {
7080
privMap firewalldb.PrivacyMapper
7181
}
7282

73-
// newSessionRPCServer creates a new sessionRpcServer using the passed config.
74-
func newSessionRPCServer(cfg *sessionRpcServerConfig) (*sessionRpcServer,
75-
error) {
76-
77-
// Create the gRPC server that handles adding/removing sessions and the
78-
// actual mailbox server that spins up the Terminal Connect server
79-
// interface.
80-
server := session.NewServer(
81-
func(id session.ID, opts ...grpc.ServerOption) *grpc.Server {
82-
// Add the session ID injector interceptors first so
83-
// that the session ID is available in the context of
84-
// all interceptors that come after.
85-
allOpts := []grpc.ServerOption{
86-
addSessionIDToStreamCtx(id),
87-
addSessionIDToUnaryCtx(id),
88-
}
89-
90-
allOpts = append(allOpts, cfg.grpcOptions...)
91-
allOpts = append(allOpts, opts...)
92-
93-
// Construct the gRPC server with the options.
94-
grpcServer := grpc.NewServer(allOpts...)
95-
96-
// Register various grpc servers with the LNC session
97-
// server.
98-
cfg.registerGrpcServers(grpcServer)
99-
100-
return grpcServer
101-
},
102-
)
103-
83+
// newSessionRPCServer creates a new sessionRpcServer.
84+
func newSessionRPCServer() (*sessionRpcServer, error) {
10485
return &sessionRpcServer{
105-
cfg: cfg,
106-
sessionServer: server,
107-
quit: make(chan struct{}),
86+
quit: make(chan struct{}),
10887
}, nil
10988
}
11089

@@ -164,9 +143,52 @@ func addSessionIDToUnaryCtx(id session.ID) grpc.ServerOption {
164143
})
165144
}
166145

167-
// start all the components necessary for the sessionRpcServer to start serving
168-
// requests. This includes resuming all non-revoked sessions.
169-
func (s *sessionRpcServer) start(ctx context.Context) error {
146+
// started returns true if the server has been started, and false otherwise.
147+
// NOTE: This function is safe for concurrent access.
148+
func (s *sessionRpcServer) started() bool {
149+
return s.active.Load()
150+
}
151+
152+
// start starts a new sessionRpcServer using the passed config, and adds all
153+
// components necessary for the sessionRpcServer to start serving requests. This
154+
// includes resuming all non-revoked sessions.
155+
func (s *sessionRpcServer) start(ctx context.Context,
156+
cfg *sessionRpcServerConfig) error {
157+
158+
if s.active.Swap(true) {
159+
return errors.New("session rpc server is already started")
160+
}
161+
162+
// Create the gRPC server that handles adding/removing sessions and the
163+
// actual mailbox server that spins up the Terminal Connect server
164+
// interface.
165+
server := session.NewServer(
166+
func(id session.ID, opts ...grpc.ServerOption) *grpc.Server {
167+
// Add the session ID injector interceptors first so
168+
// that the session ID is available in the context of
169+
// all interceptors that come after.
170+
allOpts := []grpc.ServerOption{
171+
addSessionIDToStreamCtx(id),
172+
addSessionIDToUnaryCtx(id),
173+
}
174+
175+
allOpts = append(allOpts, cfg.grpcOptions...)
176+
allOpts = append(allOpts, opts...)
177+
178+
// Construct the gRPC server with the options.
179+
grpcServer := grpc.NewServer(allOpts...)
180+
181+
// Register various grpc servers with the LNC session
182+
// server.
183+
cfg.registerGrpcServers(grpcServer)
184+
185+
return grpcServer
186+
},
187+
)
188+
189+
s.cfg = cfg
190+
s.sessionServer = server
191+
170192
// Delete all sessions in the Reserved state.
171193
err := s.cfg.db.DeleteReservedSessions(ctx)
172194
if err != nil {
@@ -255,7 +277,9 @@ func (s *sessionRpcServer) start(ctx context.Context) error {
255277
func (s *sessionRpcServer) stop() error {
256278
var returnErr error
257279
s.stopOnce.Do(func() {
258-
s.sessionServer.Stop()
280+
if s.sessionServer != nil {
281+
s.sessionServer.Stop()
282+
}
259283

260284
close(s.quit)
261285
s.wg.Wait()
@@ -268,6 +292,10 @@ func (s *sessionRpcServer) stop() error {
268292
func (s *sessionRpcServer) AddSession(ctx context.Context,
269293
req *litrpc.AddSessionRequest) (*litrpc.AddSessionResponse, error) {
270294

295+
if !s.started() {
296+
return nil, ErrServerNotActive
297+
}
298+
271299
expiry := time.Unix(int64(req.ExpiryTimestampSeconds), 0)
272300
if time.Now().After(expiry) {
273301
return nil, fmt.Errorf("expiry must be in the future")
@@ -618,6 +646,10 @@ func (s *sessionRpcServer) resumeSession(ctx context.Context,
618646
func (s *sessionRpcServer) ListSessions(ctx context.Context,
619647
_ *litrpc.ListSessionsRequest) (*litrpc.ListSessionsResponse, error) {
620648

649+
if !s.started() {
650+
return nil, ErrServerNotActive
651+
}
652+
621653
sessions, err := s.cfg.db.ListAllSessions(ctx)
622654
if err != nil {
623655
return nil, fmt.Errorf("error fetching sessions: %v", err)
@@ -642,6 +674,10 @@ func (s *sessionRpcServer) ListSessions(ctx context.Context,
642674
func (s *sessionRpcServer) RevokeSession(ctx context.Context,
643675
req *litrpc.RevokeSessionRequest) (*litrpc.RevokeSessionResponse, error) {
644676

677+
if !s.started() {
678+
return nil, ErrServerNotActive
679+
}
680+
645681
pubKey, err := btcec.ParsePubKey(req.LocalPublicKey)
646682
if err != nil {
647683
return nil, fmt.Errorf("error parsing public key: %v", err)
@@ -676,6 +712,10 @@ func (s *sessionRpcServer) PrivacyMapConversion(ctx context.Context,
676712
req *litrpc.PrivacyMapConversionRequest) (
677713
*litrpc.PrivacyMapConversionResponse, error) {
678714

715+
if !s.started() {
716+
return nil, ErrServerNotActive
717+
}
718+
679719
var (
680720
groupID session.ID
681721
err error
@@ -733,6 +773,10 @@ func (s *sessionRpcServer) PrivacyMapConversion(ctx context.Context,
733773
func (s *sessionRpcServer) ListActions(ctx context.Context,
734774
req *litrpc.ListActionsRequest) (*litrpc.ListActionsResponse, error) {
735775

776+
if !s.started() {
777+
return nil, ErrServerNotActive
778+
}
779+
736780
// If no maximum number of actions is given, use a default of 100.
737781
if req.MaxNumActions == 0 {
738782
req.MaxNumActions = 100
@@ -841,6 +885,10 @@ func (s *sessionRpcServer) ListAutopilotFeatures(ctx context.Context,
841885
_ *litrpc.ListAutopilotFeaturesRequest) (
842886
*litrpc.ListAutopilotFeaturesResponse, error) {
843887

888+
if !s.started() {
889+
return nil, ErrServerNotActive
890+
}
891+
844892
fs, err := s.cfg.autopilot.ListFeatures(ctx)
845893
if err != nil {
846894
return nil, err
@@ -884,6 +932,10 @@ func (s *sessionRpcServer) AddAutopilotSession(ctx context.Context,
884932
req *litrpc.AddAutopilotSessionRequest) (
885933
*litrpc.AddAutopilotSessionResponse, error) {
886934

935+
if !s.started() {
936+
return nil, ErrServerNotActive
937+
}
938+
887939
if len(req.Features) == 0 {
888940
return nil, fmt.Errorf("must include at least one feature")
889941
}
@@ -1325,6 +1377,10 @@ func (s *sessionRpcServer) ListAutopilotSessions(ctx context.Context,
13251377
_ *litrpc.ListAutopilotSessionsRequest) (
13261378
*litrpc.ListAutopilotSessionsResponse, error) {
13271379

1380+
if !s.started() {
1381+
return nil, ErrServerNotActive
1382+
}
1383+
13281384
sessions, err := s.cfg.db.ListSessionsByType(ctx, session.TypeAutopilot)
13291385
if err != nil {
13301386
return nil, fmt.Errorf("error fetching sessions: %v", err)
@@ -1349,6 +1405,10 @@ func (s *sessionRpcServer) RevokeAutopilotSession(ctx context.Context,
13491405
req *litrpc.RevokeAutopilotSessionRequest) (
13501406
*litrpc.RevokeAutopilotSessionResponse, error) {
13511407

1408+
if !s.started() {
1409+
return nil, ErrServerNotActive
1410+
}
1411+
13521412
pubKey, err := btcec.ParsePubKey(req.LocalPublicKey)
13531413
if err != nil {
13541414
return nil, fmt.Errorf("error parsing public key: %v", err)

terminal.go

Lines changed: 33 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -463,14 +463,6 @@ func (g *LightningTerminal) start(ctx context.Context) error {
463463
return fmt.Errorf("error creating account service: %v", err)
464464
}
465465

466-
superMacBaker := func(ctx context.Context, rootKeyID uint64,
467-
perms []bakery.Op, caveats []macaroon.Caveat) (string, error) {
468-
469-
return litmac.BakeSuperMacaroon(
470-
ctx, g.basicClient, rootKeyID, perms, caveats,
471-
)
472-
}
473-
474466
g.accountRpcServer = accounts.NewRPCServer()
475467

476468
g.ruleMgrs = rules.NewRuleManagerSet()
@@ -504,35 +496,7 @@ func (g *LightningTerminal) start(ctx context.Context) error {
504496
}
505497
}
506498

507-
g.sessionRpcServer, err = newSessionRPCServer(&sessionRpcServerConfig{
508-
db: g.stores.sessions,
509-
basicAuth: g.rpcProxy.basicAuth,
510-
grpcOptions: []grpc.ServerOption{
511-
grpc.CustomCodec(grpcProxy.Codec()), // nolint: staticcheck,
512-
grpc.ChainStreamInterceptor(
513-
g.rpcProxy.StreamServerInterceptor,
514-
),
515-
grpc.ChainUnaryInterceptor(
516-
g.rpcProxy.UnaryServerInterceptor,
517-
),
518-
grpc.UnknownServiceHandler(
519-
grpcProxy.TransparentHandler(
520-
// Don't allow calls to litrpc.
521-
g.rpcProxy.makeDirector(false),
522-
),
523-
),
524-
},
525-
registerGrpcServers: func(server *grpc.Server) {
526-
g.registerSubDaemonGrpcServers(server, true)
527-
},
528-
superMacBaker: superMacBaker,
529-
firstConnectionDeadline: g.cfg.FirstLNCConnDeadline,
530-
permMgr: g.permsMgr,
531-
actionsDB: g.stores.firewall,
532-
autopilot: g.autopilotClient,
533-
ruleMgrs: g.ruleMgrs,
534-
privMap: g.stores.firewall,
535-
})
499+
g.sessionRpcServer, err = newSessionRPCServer()
536500
if err != nil {
537501
return fmt.Errorf("could not create new session rpc "+
538502
"server: %v", err)
@@ -1055,7 +1019,38 @@ func (g *LightningTerminal) startInternalSubServers(ctx context.Context,
10551019
}
10561020

10571021
log.Infof("Starting LiT session server")
1058-
if err = g.sessionRpcServer.start(ctx); err != nil {
1022+
1023+
sessionCfg := &sessionRpcServerConfig{
1024+
db: g.stores.sessions,
1025+
basicAuth: g.rpcProxy.basicAuth,
1026+
grpcOptions: []grpc.ServerOption{
1027+
grpc.CustomCodec(grpcProxy.Codec()), // nolint: staticcheck,
1028+
grpc.ChainStreamInterceptor(
1029+
g.rpcProxy.StreamServerInterceptor,
1030+
),
1031+
grpc.ChainUnaryInterceptor(
1032+
g.rpcProxy.UnaryServerInterceptor,
1033+
),
1034+
grpc.UnknownServiceHandler(
1035+
grpcProxy.TransparentHandler(
1036+
// Don't allow calls to litrpc.
1037+
g.rpcProxy.makeDirector(false),
1038+
),
1039+
),
1040+
},
1041+
registerGrpcServers: func(server *grpc.Server) {
1042+
g.registerSubDaemonGrpcServers(server, true)
1043+
},
1044+
superMacBaker: superMacBaker,
1045+
firstConnectionDeadline: g.cfg.FirstLNCConnDeadline,
1046+
permMgr: g.permsMgr,
1047+
actionsDB: g.stores.firewall,
1048+
autopilot: g.autopilotClient,
1049+
ruleMgrs: g.ruleMgrs,
1050+
privMap: g.stores.firewall,
1051+
}
1052+
1053+
if err = g.sessionRpcServer.start(ctx, sessionCfg); err != nil {
10591054
return err
10601055
}
10611056
g.sessionRpcServerStarted = true

0 commit comments

Comments
 (0)