-
Notifications
You must be signed in to change notification settings - Fork 1.3k
kvm: Handle storage issue on NFS/KVM in multiple ways #4708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
1cd7495
kvm: Handle storage issue on NFS/KVM in multiple ways
ustcweizhou ce62820
kvm: Schedule investigate tasks for disconnected hosts
ustcweizhou f4e2f20
kvm: Add option 'noaction' for storage issue handling
ustcweizhou 9fd17bd
Merge remote-tracking branch 'apache/main' into 4.16-kvm-storage-issues
weizhouapache File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -29,6 +29,7 @@ | |||||
| import java.util.concurrent.ExecutorService; | ||||||
| import java.util.concurrent.LinkedBlockingQueue; | ||||||
| import java.util.concurrent.ScheduledExecutorService; | ||||||
| import java.util.concurrent.ScheduledFuture; | ||||||
| import java.util.concurrent.ScheduledThreadPoolExecutor; | ||||||
| import java.util.concurrent.ThreadPoolExecutor; | ||||||
| import java.util.concurrent.TimeUnit; | ||||||
|
|
@@ -77,6 +78,8 @@ | |||||
| import com.cloud.agent.transport.Request; | ||||||
| import com.cloud.agent.transport.Response; | ||||||
| import com.cloud.alert.AlertManager; | ||||||
| import com.cloud.cluster.ManagementServerHostVO; | ||||||
| import com.cloud.cluster.dao.ManagementServerHostDao; | ||||||
| import com.cloud.configuration.ManagementServiceConfiguration; | ||||||
| import com.cloud.dc.ClusterVO; | ||||||
| import com.cloud.dc.DataCenterVO; | ||||||
|
|
@@ -105,6 +108,7 @@ | |||||
| import com.cloud.utils.concurrency.NamedThreadFactory; | ||||||
| import com.cloud.utils.db.DB; | ||||||
| import com.cloud.utils.db.EntityManager; | ||||||
| import com.cloud.utils.db.Filter; | ||||||
| import com.cloud.utils.db.QueryBuilder; | ||||||
| import com.cloud.utils.db.SearchCriteria.Op; | ||||||
| import com.cloud.utils.db.TransactionLegacy; | ||||||
|
|
@@ -121,6 +125,11 @@ | |||||
| import com.cloud.utils.time.InaccurateClock; | ||||||
| import com.google.common.base.Strings; | ||||||
|
|
||||||
| import static com.cloud.configuration.ConfigurationManagerImpl.KVM_HEARTBEAT_FAILURE_ACTION; | ||||||
| import static com.cloud.configuration.ConfigurationManagerImpl.KVM_HEARTBEAT_UPDATE_MAX_RETRIES; | ||||||
| import static com.cloud.configuration.ConfigurationManagerImpl.KVM_HEARTBEAT_UPDATE_RETRY_SLEEP; | ||||||
| import static com.cloud.configuration.ConfigurationManagerImpl.KVM_HEARTBEAT_UPDATE_TIMEOUT; | ||||||
|
|
||||||
| /** | ||||||
| * Implementation of the Agent Manager. This class controls the connection to the agents. | ||||||
| **/ | ||||||
|
|
@@ -157,6 +166,8 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl | |||||
| protected ConfigurationDao _configDao = null; | ||||||
| @Inject | ||||||
| protected ClusterDao _clusterDao = null; | ||||||
| @Inject | ||||||
| protected ManagementServerHostDao _msHostDao; | ||||||
|
|
||||||
| @Inject | ||||||
| protected HighAvailabilityManager _haMgr = null; | ||||||
|
|
@@ -178,12 +189,16 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl | |||||
| protected ScheduledExecutorService _directAgentExecutor; | ||||||
| protected ScheduledExecutorService _cronJobExecutor; | ||||||
| protected ScheduledExecutorService _monitorExecutor; | ||||||
| protected ScheduledExecutorService _scanHostsExecutor; | ||||||
| protected ScheduledExecutorService _investigatorExecutor; | ||||||
|
|
||||||
| private int _directAgentThreadCap; | ||||||
|
|
||||||
| protected StateMachine2<Status, Status.Event, Host> _statusStateMachine = Status.getStateMachine(); | ||||||
| private final ConcurrentHashMap<Long, Long> _pingMap = new ConcurrentHashMap<Long, Long>(10007); | ||||||
|
|
||||||
| private final ConcurrentHashMap<Long, ScheduledFuture<?>> _investigateTasksMap = new ConcurrentHashMap<>(); | ||||||
|
|
||||||
| @Inject | ||||||
| ResourceManager _resourceMgr; | ||||||
| @Inject | ||||||
|
|
@@ -203,6 +218,14 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl | |||||
| protected final ConfigKey<Boolean> CheckTxnBeforeSending = new ConfigKey<Boolean>("Developer", Boolean.class, "check.txn.before.sending.agent.commands", "false", | ||||||
| "This parameter allows developers to enable a check to see if a transaction wraps commands that are sent to the resource. This is not to be enabled on production systems.", true); | ||||||
|
|
||||||
| protected final ConfigKey<Boolean> InvestigateDisconnectedHosts = new ConfigKey<>("Advanced", Boolean.class, "investigate.disconnected.hosts", | ||||||
| "false", "Determines whether to investigate VMs on disconnected hosts", false); | ||||||
| protected final ConfigKey<Integer> InvestigateDisconnectedHostsInterval = new ConfigKey<>("Advanced", Integer.class, "investigate.disconnected.hosts.interval", | ||||||
| "300", "The time (in seconds) between VM investigation on disconnected hosts.", false); | ||||||
| protected final ConfigKey<Integer> InvestigateDisconnectedHostsPoolSize = new ConfigKey<Integer>("Advanced", Integer.class, "investigate.disconnected.hosts.pool.size", "10", | ||||||
| "Default pool size to investigate disconnected hosts", false); | ||||||
|
|
||||||
|
|
||||||
| @Override | ||||||
| public boolean configure(final String name, final Map<String, Object> params) throws ConfigurationException { | ||||||
|
|
||||||
|
|
@@ -238,6 +261,9 @@ public boolean configure(final String name, final Map<String, Object> params) th | |||||
|
|
||||||
| _monitorExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AgentMonitor")); | ||||||
|
|
||||||
| _scanHostsExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("HostsScanner")); | ||||||
| _investigatorExecutor = new ScheduledThreadPoolExecutor(InvestigateDisconnectedHostsPoolSize.value(), new NamedThreadFactory("DisconnectHostsInvestigator")); | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
|
||||||
| return true; | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -614,6 +640,10 @@ public boolean start() { | |||||
|
|
||||||
| _monitorExecutor.scheduleWithFixedDelay(new MonitorTask(), mgmtServiceConf.getPingInterval(), mgmtServiceConf.getPingInterval(), TimeUnit.SECONDS); | ||||||
|
|
||||||
| if (InvestigateDisconnectedHosts.value()) { | ||||||
| _scanHostsExecutor.scheduleAtFixedRate(new ScanDisconnectedHostsTask(), 60, 60, TimeUnit.SECONDS); | ||||||
| } | ||||||
|
|
||||||
| return true; | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -773,6 +803,8 @@ public boolean stop() { | |||||
|
|
||||||
| _connectExecutor.shutdownNow(); | ||||||
| _monitorExecutor.shutdownNow(); | ||||||
| _investigatorExecutor.shutdownNow(); | ||||||
|
|
||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||
| return true; | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -1725,7 +1757,8 @@ public String getConfigComponentName() { | |||||
| @Override | ||||||
| public ConfigKey<?>[] getConfigKeys() { | ||||||
| return new ConfigKey<?>[] { CheckTxnBeforeSending, Workers, Port, Wait, AlertWait, DirectAgentLoadSize, DirectAgentPoolSize, | ||||||
| DirectAgentThreadCap }; | ||||||
| DirectAgentThreadCap, | ||||||
| InvestigateDisconnectedHosts, InvestigateDisconnectedHostsInterval, InvestigateDisconnectedHostsPoolSize }; | ||||||
| } | ||||||
|
|
||||||
| protected class SetHostParamsListener implements Listener { | ||||||
|
|
@@ -1758,8 +1791,11 @@ public void processConnect(final Host host, final StartupCommand cmd, final bool | |||||
| if (cmd instanceof StartupRoutingCommand) { | ||||||
| if (((StartupRoutingCommand)cmd).getHypervisorType() == HypervisorType.KVM || ((StartupRoutingCommand)cmd).getHypervisorType() == HypervisorType.LXC) { | ||||||
| Map<String, String> params = new HashMap<String, String>(); | ||||||
| params.put("router.aggregation.command.each.timeout", _configDao.getValue("router.aggregation.command.each.timeout")); | ||||||
|
|
||||||
| Arrays.asList(KVM_HEARTBEAT_UPDATE_MAX_RETRIES, | ||||||
| KVM_HEARTBEAT_UPDATE_RETRY_SLEEP, | ||||||
| KVM_HEARTBEAT_UPDATE_TIMEOUT, | ||||||
| KVM_HEARTBEAT_FAILURE_ACTION) | ||||||
| .forEach(c -> params.put(c, _configDao.getValue(c))); | ||||||
| try { | ||||||
| SetHostParamsCommand cmds = new SetHostParamsCommand(params); | ||||||
| Commands c = new Commands(cmds); | ||||||
|
|
@@ -1838,4 +1874,91 @@ public void propagateChangeToAgents(Map<String, String> params) { | |||||
| sendCommandToAgents(hostsPerZone, params); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| protected class ScanDisconnectedHostsTask extends ManagedContextRunnable { | ||||||
|
|
||||||
| @Override | ||||||
| protected void runInContext() { | ||||||
| try { | ||||||
| ManagementServerHostVO msHost = _msHostDao.findOneInUpState(new Filter(ManagementServerHostVO.class, "id", true, 0L, 1L)); | ||||||
| if (msHost == null || (msHost.getMsid() != _nodeId)) { | ||||||
| s_logger.debug("Skipping disconnected hosts scan task"); | ||||||
| for (Long hostId : _investigateTasksMap.keySet()) { | ||||||
| cancelInvestigationTask(hostId); | ||||||
| } | ||||||
| return; | ||||||
| } | ||||||
| for (HostVO host : _hostDao.listByType(Host.Type.Routing)) { | ||||||
| if (host.getStatus() == Status.Disconnected) { | ||||||
| scheduleInvestigationTask(host.getId()); | ||||||
| } | ||||||
| } | ||||||
| } catch (final Exception e) { | ||||||
| s_logger.error("Exception caught while scanning disconnected hosts : ", e); | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| protected class InvestigationTask extends ManagedContextRunnable { | ||||||
| Long _hostId; | ||||||
|
|
||||||
| InvestigationTask(final Long hostId) { | ||||||
| _hostId = hostId; | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| protected void runInContext() { | ||||||
| try { | ||||||
| final long hostId = _hostId; | ||||||
| s_logger.info("Investigating host " + hostId + " to determine its actual state"); | ||||||
| HostVO host = _hostDao.findById(hostId); | ||||||
| if (host == null) { | ||||||
| s_logger.info("Cancelling investigation on host " + hostId + " which might has been removed"); | ||||||
| cancelInvestigationTask(hostId); | ||||||
| return; | ||||||
| } | ||||||
| if (host.getStatus() != Status.Disconnected) { | ||||||
| s_logger.info("Cancelling investigation on host " + hostId + " in status " + host.getStatus()); | ||||||
| cancelInvestigationTask(hostId); | ||||||
| return; | ||||||
| } | ||||||
| Status determinedState = _haMgr.investigate(hostId); | ||||||
| s_logger.info("Investigators determine the status of host " + hostId + " is " + determinedState); | ||||||
| if (determinedState == Status.Down) { | ||||||
| agentStatusTransitTo(host, Status.Event.HostDown, _nodeId); | ||||||
| s_logger.info("Scheduling VMs restart on host " + hostId + " which is Down"); | ||||||
| _haMgr.scheduleRestartForVmsOnHost(host, true); | ||||||
| s_logger.info("Cancelling investigation on host " + hostId + " which is Down"); | ||||||
| cancelInvestigationTask(hostId); | ||||||
| } | ||||||
| } catch (final Exception e) { | ||||||
| s_logger.error("Exception caught while handling investigation task: ", e); | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| private void scheduleInvestigationTask(final Long hostId) { | ||||||
| ScheduledFuture future = _investigateTasksMap.get(hostId); | ||||||
| if (future != null) { | ||||||
| s_logger.info("There is already a task to investigate host " + hostId); | ||||||
| } else { | ||||||
| ScheduledFuture scheduledFuture = _investigatorExecutor.scheduleWithFixedDelay(new InvestigationTask(hostId), InvestigateDisconnectedHostsInterval.value(), | ||||||
| InvestigateDisconnectedHostsInterval.value(), TimeUnit.SECONDS); | ||||||
| _investigateTasksMap.put(hostId, scheduledFuture); | ||||||
| s_logger.info("Scheduled a task to investigate host " + hostId); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| private void cancelInvestigationTask(final Long hostId) { | ||||||
| ScheduledFuture future = _investigateTasksMap.get(hostId); | ||||||
| if (future != null) { | ||||||
| try { | ||||||
| future.cancel(false); | ||||||
| s_logger.info("Cancelled a task to investigate host " + hostId); | ||||||
| _investigateTasksMap.remove(hostId); | ||||||
| } catch (Exception e) { | ||||||
| s_logger.error("Exception caught while cancelling investigation task: ", e); | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.