Skip to content
Snippets Groups Projects
Unverified Commit 49f9b02a authored by Levente Pap's avatar Levente Pap
Browse files

Merge branch 'feat/workerpool-monitor' into test/sync_debug_and_650

parents 1411ee69 2e07cdb3
No related branches found
No related tags found
No related merge requests found
Showing
with 198 additions and 137 deletions
......@@ -7,7 +7,6 @@ import (
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/messageparser"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/autopeering/peer"
)
......@@ -17,7 +16,6 @@ type SignatureFilter struct {
onRejectCallback func(message *message.Message, err error, peer *peer.Peer)
onAcceptCallbackMutex sync.RWMutex
onRejectCallbackMutex sync.RWMutex
workerPool async.WorkerPool
}
// NewSignatureFilter is the constructor of the MessageFilter.
......@@ -28,33 +26,31 @@ func NewSignatureFilter() *SignatureFilter {
// Filter get's called whenever a new message is received. It rejects the message, if the message is not a valid value
// message.
func (filter *SignatureFilter) Filter(message *message.Message, peer *peer.Peer) {
filter.workerPool.Submit(func() {
// accept message if the message is not a value message (it will be checked by other filters)
valuePayload := message.Payload()
if valuePayload.Type() != payload.Type {
filter.getAcceptCallback()(message, peer)
// accept message if the message is not a value message (it will be checked by other filters)
valuePayload := message.Payload()
if valuePayload.Type() != payload.Type {
filter.getAcceptCallback()(message, peer)
return
}
return
}
// reject if the payload can not be casted to a ValuePayload (invalid payload)
typeCastedValuePayload, ok := valuePayload.(*payload.Payload)
if !ok {
filter.getRejectCallback()(message, errors.New("invalid value message"), peer)
// reject if the payload can not be casted to a ValuePayload (invalid payload)
typeCastedValuePayload, ok := valuePayload.(*payload.Payload)
if !ok {
filter.getRejectCallback()(message, errors.New("invalid value message"), peer)
return
}
return
}
// reject message if it contains a transaction with invalid signatures
if !typeCastedValuePayload.Transaction().SignaturesValid() {
filter.getRejectCallback()(message, errors.New("invalid transaction signatures"), peer)
// reject message if it contains a transaction with invalid signatures
if !typeCastedValuePayload.Transaction().SignaturesValid() {
filter.getRejectCallback()(message, errors.New("invalid transaction signatures"), peer)
return
}
return
}
// if all previous checks passed: accept message
filter.getAcceptCallback()(message, peer)
})
// if all previous checks passed: accept message
filter.getAcceptCallback()(message, peer)
}
// OnAccept registers the given callback as the acceptance function of the filter.
......@@ -73,11 +69,6 @@ func (filter *SignatureFilter) OnReject(callback func(message *message.Message,
filter.onRejectCallback = callback
}
// Shutdown shuts down the filter.
func (filter *SignatureFilter) Shutdown() {
filter.workerPool.ShutdownGracefully()
}
// getAcceptCallback returns the callback that is executed when a message passes the filter.
func (filter *SignatureFilter) getAcceptCallback() func(message *message.Message, peer *peer.Peer) {
filter.onAcceptCallbackMutex.RLock()
......
......@@ -4,10 +4,8 @@ import (
"fmt"
"sync"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/hive.go/autopeering/peer"
)
// ErrInvalidSignature is returned when a message contains an invalid signature.
......@@ -17,7 +15,6 @@ var ErrInvalidSignature = fmt.Errorf("invalid signature")
type MessageSignatureFilter struct {
onAcceptCallback func(msg *message.Message, peer *peer.Peer)
onRejectCallback func(msg *message.Message, err error, peer *peer.Peer)
workerPool async.WorkerPool
onAcceptCallbackMutex sync.RWMutex
onRejectCallbackMutex sync.RWMutex
......@@ -28,32 +25,30 @@ func NewMessageSignatureFilter() *MessageSignatureFilter {
return &MessageSignatureFilter{}
}
// Filter filters up on the given bytes and peer and calls the acceptance callback
// if the input passes or the rejection callback if the input is rejected.
func (filter *MessageSignatureFilter) Filter(msg *message.Message, peer *peer.Peer) {
filter.workerPool.Submit(func() {
if msg.VerifySignature() {
filter.getAcceptCallback()(msg, peer)
return
}
filter.getRejectCallback()(msg, ErrInvalidSignature, peer)
})
if msg.VerifySignature() {
filter.getAcceptCallback()(msg, peer)
return
}
filter.getRejectCallback()(msg, ErrInvalidSignature, peer)
}
// OnAccept registers the given callback as the acceptance function of the filter.
func (filter *MessageSignatureFilter) OnAccept(callback func(msg *message.Message, peer *peer.Peer)) {
filter.onAcceptCallbackMutex.Lock()
filter.onAcceptCallback = callback
filter.onAcceptCallbackMutex.Unlock()
}
// OnReject registers the given callback as the rejection function of the filter.
func (filter *MessageSignatureFilter) OnReject(callback func(msg *message.Message, err error, peer *peer.Peer)) {
filter.onRejectCallbackMutex.Lock()
filter.onRejectCallback = callback
filter.onRejectCallbackMutex.Unlock()
}
func (filter *MessageSignatureFilter) Shutdown() {
filter.workerPool.ShutdownGracefully()
}
func (filter *MessageSignatureFilter) getAcceptCallback() (result func(msg *message.Message, peer *peer.Peer)) {
filter.onAcceptCallbackMutex.RLock()
result = filter.onAcceptCallback
......
......@@ -7,7 +7,6 @@ import (
"sync"
"github.com/iotaledger/goshimmer/packages/pow"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/autopeering/peer"
)
......@@ -22,7 +21,6 @@ var (
type PowFilter struct {
worker *pow.Worker
difficulty int
workerPool async.WorkerPool
mu sync.Mutex
acceptCallback func([]byte, *peer.Peer)
......@@ -39,13 +37,11 @@ func NewPowFilter(worker *pow.Worker, difficulty int) *PowFilter {
// Filter checks whether the given bytes pass the PoW validation and calls the corresponding callback.
func (f *PowFilter) Filter(msgBytes []byte, p *peer.Peer) {
f.workerPool.Submit(func() {
if err := f.validate(msgBytes); err != nil {
f.reject(msgBytes, err, p)
return
}
f.accept(msgBytes, p)
})
if err := f.validate(msgBytes); err != nil {
f.reject(msgBytes, err, p)
return
}
f.accept(msgBytes, p)
}
// OnAccept registers the given callback as the acceptance function of the filter.
......@@ -62,11 +58,6 @@ func (f *PowFilter) OnReject(callback func([]byte, error, *peer.Peer)) {
f.rejectCallback = callback
}
// Shutdown shuts down the filter.
func (f *PowFilter) Shutdown() {
f.workerPool.ShutdownGracefully()
}
func (f *PowFilter) accept(msgBytes []byte, p *peer.Peer) {
f.mu.Lock()
defer f.mu.Unlock()
......
......@@ -18,15 +18,14 @@ import (
)
var (
testPayload = payload.NewData([]byte("test"))
testPeer *peer.Peer = nil
testWorker = pow.New(crypto.BLAKE2b_512, 1)
testDifficulty = 10
testPayload = payload.NewData([]byte("test"))
testPeer *peer.Peer
testWorker = pow.New(crypto.BLAKE2b_512, 1)
testDifficulty = 10
)
func TestPowFilter_Filter(t *testing.T) {
filter := NewPowFilter(testWorker, testDifficulty)
defer filter.Shutdown()
// set callbacks
m := &callbackMock{}
......@@ -61,7 +60,6 @@ func TestPowFilter_Filter(t *testing.T) {
filter.Filter(msgPOWBytes, testPeer)
})
filter.Shutdown()
m.AssertExpectations(t)
}
......
......@@ -4,7 +4,6 @@ import (
"fmt"
"sync"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/bytesfilter"
)
......@@ -17,7 +16,6 @@ type RecentlySeenBytesFilter struct {
bytesFilter *bytesfilter.BytesFilter
onAcceptCallback func(bytes []byte, peer *peer.Peer)
onRejectCallback func(bytes []byte, err error, peer *peer.Peer)
workerPool async.WorkerPool
onAcceptCallbackMutex sync.RWMutex
onRejectCallbackMutex sync.RWMutex
......@@ -30,22 +28,24 @@ func NewRecentlySeenBytesFilter() *RecentlySeenBytesFilter {
}
}
// Filter filters up on the given bytes and peer and calls the acceptance callback
// if the input passes or the rejection callback if the input is rejected.
func (filter *RecentlySeenBytesFilter) Filter(bytes []byte, peer *peer.Peer) {
filter.workerPool.Submit(func() {
if filter.bytesFilter.Add(bytes) {
filter.getAcceptCallback()(bytes, peer)
return
}
filter.getRejectCallback()(bytes, ErrReceivedDuplicateBytes, peer)
})
if filter.bytesFilter.Add(bytes) {
filter.getAcceptCallback()(bytes, peer)
return
}
filter.getRejectCallback()(bytes, ErrReceivedDuplicateBytes, peer)
}
// OnAccept registers the given callback as the acceptance function of the filter.
func (filter *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte, peer *peer.Peer)) {
filter.onAcceptCallbackMutex.Lock()
filter.onAcceptCallback = callback
filter.onAcceptCallbackMutex.Unlock()
}
// OnReject registers the given callback as the rejection function of the filter.
func (filter *RecentlySeenBytesFilter) OnReject(callback func(bytes []byte, err error, peer *peer.Peer)) {
filter.onRejectCallbackMutex.Lock()
filter.onRejectCallback = callback
......@@ -65,7 +65,3 @@ func (filter *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes []
filter.onRejectCallbackMutex.Unlock()
return
}
func (filter *RecentlySeenBytesFilter) Shutdown() {
filter.workerPool.ShutdownGracefully()
}
......@@ -13,6 +13,4 @@ type BytesFilter interface {
OnAccept(callback func(bytes []byte, peer *peer.Peer))
// OnReject registers the given callback as the rejection function of the filter.
OnReject(callback func(bytes []byte, err error, peer *peer.Peer))
// Shutdown shuts down the filter.
Shutdown()
}
......@@ -15,6 +15,4 @@ type MessageFilter interface {
OnAccept(callback func(msg *message.Message, peer *peer.Peer))
// OnAccept registers the given callback as the rejection function of the filter.
OnReject(callback func(msg *message.Message, err error, peer *peer.Peer))
// Shutdown shuts down the filter.
Shutdown()
}
......@@ -70,21 +70,6 @@ func (messageParser *MessageParser) AddMessageFilter(filter MessageFilter) {
messageParser.messageFiltersModified.Set()
}
// Shutdown shut downs the message parser and its corresponding registered filters.
func (messageParser *MessageParser) Shutdown() {
messageParser.bytesFiltersMutex.Lock()
for _, bytesFilter := range messageParser.bytesFilters {
bytesFilter.Shutdown()
}
messageParser.bytesFiltersMutex.Unlock()
messageParser.messageFiltersMutex.Lock()
for _, messageFilter := range messageParser.messageFilters {
messageFilter.Shutdown()
}
messageParser.messageFiltersMutex.Unlock()
}
// sets up the byte filter data flow chain.
func (messageParser *MessageParser) setupBytesFilterDataFlow() {
if !messageParser.byteFiltersModified.IsSet() {
......
......@@ -22,8 +22,6 @@ func BenchmarkMessageParser_ParseBytesSame(b *testing.B) {
for i := 0; i < b.N; i++ {
msgParser.Parse(msgBytes, nil)
}
msgParser.Shutdown()
}
func BenchmarkMessageParser_ParseBytesDifferent(b *testing.B) {
......@@ -39,8 +37,6 @@ func BenchmarkMessageParser_ParseBytesDifferent(b *testing.B) {
for i := 0; i < b.N; i++ {
msgParser.Parse(messageBytes[i], nil)
}
msgParser.Shutdown()
}
func TestMessageParser_ParseMessage(t *testing.T) {
......@@ -52,8 +48,6 @@ func TestMessageParser_ParseMessage(t *testing.T) {
msgParser.Events.MessageParsed.Attach(events.NewClosure(func(msg *message.Message) {
log.Infof("parsed message")
}))
msgParser.Shutdown()
}
func newTestMessage(payloadString string) *message.Message {
......
......@@ -33,17 +33,18 @@ func New(optionalOptions ...Option) *MessageRequester {
// StartRequest initiates a regular triggering of the StartRequest event until it has been stopped using StopRequest.
func (requester *MessageRequester) StartRequest(id message.Id) {
requester.scheduledRequestsMutex.Lock()
defer requester.scheduledRequestsMutex.Unlock()
// ignore already scheduled requests
if _, exists := requester.scheduledRequests[id]; exists {
requester.scheduledRequestsMutex.Unlock()
return
}
// trigger the event and schedule the next request
// make this atomic to be sure that a successive call of StartRequest does not trigger again
requester.Events.SendRequest.Trigger(id)
requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id) })
requester.scheduledRequestsMutex.Unlock()
requester.Events.SendRequest.Trigger(id)
}
// StopRequest stops requests for the given message to further happen.
......
......@@ -3,7 +3,6 @@ package tangle
import (
"container/list"
"fmt"
"runtime"
"time"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
......@@ -60,7 +59,8 @@ func New(store kvstore.KVStore) (result *Tangle) {
result.DBStats()
result.solidifierWorkerPool.Tune(runtime.GOMAXPROCS(0))
result.solidifierWorkerPool.Tune(1024)
result.storeMessageWorkerPool.Tune(1024)
return
}
......@@ -326,3 +326,13 @@ func (tangle *Tangle) deleteFutureCone(messageId message.Id) {
})
}
}
// SolidifierWorkerPoolStatus returns the name and the load of the workerpool.
func (tangle *Tangle) SolidifierWorkerPoolStatus() (name string, load int) {
return "Solidifier", tangle.solidifierWorkerPool.RunningWorkers()
}
// StoreMessageWorkerPoolStatus returns the name and the load of the workerpool.
func (tangle *Tangle) StoreMessageWorkerPoolStatus() (name string, load int) {
return "StoreMessage", tangle.storeMessageWorkerPool.RunningWorkers()
}
......@@ -3,23 +3,32 @@ package gossip
import (
"fmt"
"net"
"runtime"
"sync"
"github.com/golang/protobuf/proto"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
pb "github.com/iotaledger/goshimmer/packages/gossip/proto"
"github.com/iotaledger/goshimmer/packages/gossip/server"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/identity"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/workerpool"
)
const (
maxPacketSize = 64 * 1024
)
var (
messageWorkerCount = runtime.GOMAXPROCS(0) * 4
messageWorkerQueueSize = 1000
messageRequestWorkerCount = 1
messageRequestWorkerQueueSize = 100
)
// LoadMessageFunc defines a function that returns the message for the given id.
type LoadMessageFunc func(messageId message.Id) ([]byte, error)
......@@ -36,8 +45,10 @@ type Manager struct {
srv *server.TCP
neighbors map[identity.ID]*Neighbor
// inboxWorkerPool defines a worker pool where all incoming messages are processed.
inboxWorkerPool async.WorkerPool
// messageWorkerPool defines a worker pool where all incoming messages are processed.
messageWorkerPool *workerpool.WorkerPool
messageRequestWorkerPool *workerpool.WorkerPool
}
// NewManager creates a new Manager.
......@@ -55,6 +66,21 @@ func NewManager(local *peer.Local, f LoadMessageFunc, log *logger.Logger) *Manag
srv: nil,
neighbors: make(map[identity.ID]*Neighbor),
}
m.messageWorkerPool = workerpool.New(func(task workerpool.Task) {
m.processPacketMessage(task.Param(0).([]byte), task.Param(1).(*Neighbor))
task.Return(nil)
}, workerpool.WorkerCount(messageWorkerCount), workerpool.QueueSize(messageWorkerQueueSize))
m.messageRequestWorkerPool = workerpool.New(func(task workerpool.Task) {
m.processMessageRequest(task.Param(0).([]byte), task.Param(1).(*Neighbor))
task.Return(nil)
}, workerpool.WorkerCount(messageRequestWorkerCount), workerpool.QueueSize(messageRequestWorkerQueueSize))
return m
}
......@@ -64,6 +90,9 @@ func (m *Manager) Start(srv *server.TCP) {
defer m.mu.Unlock()
m.srv = srv
m.messageWorkerPool.Start()
m.messageRequestWorkerPool.Start()
}
// Close stops the manager and closes all established connections.
......@@ -71,7 +100,8 @@ func (m *Manager) Close() {
m.stop()
m.wg.Wait()
m.inboxWorkerPool.ShutdownGracefully()
m.messageWorkerPool.Stop()
m.messageRequestWorkerPool.Stop()
}
// Events returns the events related to the gossip protocol.
......@@ -213,11 +243,9 @@ func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (n
nbr.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) {
dataCopy := make([]byte, len(data))
copy(dataCopy, data)
m.inboxWorkerPool.Submit(func() {
if err := m.handlePacket(dataCopy, nbr); err != nil {
m.log.Debugw("error handling packet", "err", err)
}
})
if err := m.handlePacket(dataCopy, nbr); err != nil {
m.log.Debugw("error handling packet", "err", err)
}
}))
m.neighbors[peer.ID()] = nbr
......@@ -236,31 +264,14 @@ func (m *Manager) handlePacket(data []byte, nbr *Neighbor) error {
switch pb.PacketType(data[0]) {
case pb.PacketMessage:
packet := new(pb.Message)
if err := proto.Unmarshal(data[1:], packet); err != nil {
return fmt.Errorf("invalid packet: %w", err)
if _, added := m.messageWorkerPool.TrySubmit(data, nbr); !added {
return fmt.Errorf("messageWorkerPool full: packet message discarded")
}
m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: packet.GetData(), Peer: nbr.Peer})
case pb.PacketMessageRequest:
packet := new(pb.MessageRequest)
if err := proto.Unmarshal(data[1:], packet); err != nil {
return fmt.Errorf("invalid packet: %w", err)
}
msgID, _, err := message.IdFromBytes(packet.GetId())
if err != nil {
return fmt.Errorf("invalid message id: %w", err)
}
msgBytes, err := m.loadMessageFunc(msgID)
if err != nil {
m.log.Debugw("error loading message", "msg-id", msgID, "err", err)
return nil
if _, added := m.messageRequestWorkerPool.TrySubmit(data, nbr); !added {
return fmt.Errorf("messageRequestWorkerPool full: message request discarded")
}
// send the loaded message directly to the neighbor
_, _ = nbr.Write(marshal(&pb.Message{Data: msgBytes}))
default:
return ErrInvalidPacket
}
......@@ -280,3 +291,41 @@ func marshal(packet pb.Packet) []byte {
}
return append([]byte{byte(packetType)}, data...)
}
// MessageWorkerPoolStatus returns the name and the load of the workerpool.
func (m *Manager) MessageWorkerPoolStatus() (name string, load int) {
return "messageWorkerPool", m.messageWorkerPool.GetPendingQueueSize()
}
// MessageRequestWorkerPoolStatus returns the name and the load of the workerpool.
func (m *Manager) MessageRequestWorkerPoolStatus() (name string, load int) {
return "messageRequestWorkerPool", m.messageRequestWorkerPool.GetPendingQueueSize()
}
func (m *Manager) processPacketMessage(data []byte, nbr *Neighbor) {
packet := new(pb.Message)
if err := proto.Unmarshal(data[1:], packet); err != nil {
m.log.Debugw("error processing packet", "err", err)
}
m.events.MessageReceived.Trigger(&MessageReceivedEvent{Data: packet.GetData(), Peer: nbr.Peer})
}
func (m *Manager) processMessageRequest(data []byte, nbr *Neighbor) {
packet := new(pb.MessageRequest)
if err := proto.Unmarshal(data[1:], packet); err != nil {
m.log.Debugw("invalid packet", "err", err)
}
msgID, _, err := message.IdFromBytes(packet.GetId())
if err != nil {
m.log.Debugw("invalid message id:", "err", err)
}
msgBytes, err := m.loadMessageFunc(msgID)
if err != nil {
m.log.Debugw("error loading message", "msg-id", msgID, "err", err)
}
// send the loaded message directly to the neighbor
_, _ = nbr.Write(marshal(&pb.Message{Data: msgBytes}))
}
......@@ -132,7 +132,6 @@ func run(*node.Plugin) {
if err := daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) {
<-shutdownSignal
messageFactory.Shutdown()
messageParser.Shutdown()
_tangle.Shutdown()
}, shutdown.PriorityTangle); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
......
......@@ -11,6 +11,8 @@ const (
CfgPrometheusProcessMetrics = "prometheus.processMetrics"
// CfgPrometheusPromhttpMetrics defines the config flag to enable/disable promhttp metrics.
CfgPrometheusPromhttpMetrics = "prometheus.promhttpMetrics"
// CfgPrometheusWorkerpoolMetrics defines the config flag to enable/disable workerpool metrics.
CfgPrometheusWorkerpoolMetrics = "prometheus.workerpoolMetrics"
// CfgPrometheusBindAddress defines the config flag of the bind address on which the Prometheus exporter listens on.
CfgPrometheusBindAddress = "prometheus.bindAddress"
)
......@@ -20,4 +22,5 @@ func init() {
flag.Bool(CfgPrometheusGoMetrics, false, "include go metrics")
flag.Bool(CfgPrometheusProcessMetrics, false, "include process metrics")
flag.Bool(CfgPrometheusPromhttpMetrics, false, "include promhttp metrics")
flag.Bool(CfgPrometheusWorkerpoolMetrics, false, "include workerpool metrics")
}
......@@ -29,6 +29,10 @@ var (
func configure(plugin *node.Plugin) {
log = logger.NewLogger(plugin.Name)
if config.Node().GetBool(CfgPrometheusWorkerpoolMetrics) {
registerWorkerpoolMetrics()
}
if config.Node().GetBool(CfgPrometheusGoMetrics) {
registry.MustRegister(prometheus.NewGoCollector())
}
......
package prometheus
import (
"github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/prometheus/client_golang/prometheus"
)
var (
workerpools *prometheus.GaugeVec
)
func registerWorkerpoolMetrics() {
workerpools = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "workerpools_load",
Help: "Info about workerpools load",
},
[]string{
"name",
},
)
registry.MustRegister(workerpools)
addCollect(collectWorkerpoolMetrics)
}
func collectWorkerpoolMetrics() {
name, load := gossip.Manager().MessageWorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
name, load = gossip.Manager().MessageRequestWorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
name, load = messagelayer.Tangle().SolidifierWorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
name, load = messagelayer.Tangle().StoreMessageWorkerPoolStatus()
workerpools.WithLabelValues(
name,
).Set(float64(load))
}
......@@ -214,7 +214,7 @@ func (d *DockerContainer) Remove() error {
// Stop stops a container without terminating the process.
// The process is blocked until the container stops or the timeout expires.
func (d *DockerContainer) Stop() error {
duration := 30 * time.Second
duration := 3 * time.Minute
return d.client.ContainerStop(context.Background(), d.id, &duration)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment