Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -77,6 +78,8 @@
import com.cloud.agent.transport.Request;
import com.cloud.agent.transport.Response;
import com.cloud.alert.AlertManager;
import com.cloud.cluster.ManagementServerHostVO;
import com.cloud.cluster.dao.ManagementServerHostDao;
import com.cloud.configuration.ManagementServiceConfiguration;
import com.cloud.dc.ClusterVO;
import com.cloud.dc.DataCenterVO;
Expand Down Expand Up @@ -105,6 +108,7 @@
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.EntityManager;
import com.cloud.utils.db.Filter;
import com.cloud.utils.db.QueryBuilder;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.TransactionLegacy;
Expand All @@ -121,6 +125,11 @@
import com.cloud.utils.time.InaccurateClock;
import com.google.common.base.Strings;

import static com.cloud.configuration.ConfigurationManagerImpl.KVM_HEARTBEAT_FAILURE_ACTION;
import static com.cloud.configuration.ConfigurationManagerImpl.KVM_HEARTBEAT_UPDATE_MAX_RETRIES;
import static com.cloud.configuration.ConfigurationManagerImpl.KVM_HEARTBEAT_UPDATE_RETRY_SLEEP;
import static com.cloud.configuration.ConfigurationManagerImpl.KVM_HEARTBEAT_UPDATE_TIMEOUT;

/**
* Implementation of the Agent Manager. This class controls the connection to the agents.
**/
Expand Down Expand Up @@ -157,6 +166,8 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
protected ConfigurationDao _configDao = null;
@Inject
protected ClusterDao _clusterDao = null;
@Inject
protected ManagementServerHostDao _msHostDao;

@Inject
protected HighAvailabilityManager _haMgr = null;
Expand All @@ -178,12 +189,16 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
protected ScheduledExecutorService _directAgentExecutor;
protected ScheduledExecutorService _cronJobExecutor;
protected ScheduledExecutorService _monitorExecutor;
protected ScheduledExecutorService _scanHostsExecutor;
protected ScheduledExecutorService _investigatorExecutor;

private int _directAgentThreadCap;

protected StateMachine2<Status, Status.Event, Host> _statusStateMachine = Status.getStateMachine();
private final ConcurrentHashMap<Long, Long> _pingMap = new ConcurrentHashMap<Long, Long>(10007);

private final ConcurrentHashMap<Long, ScheduledFuture<?>> _investigateTasksMap = new ConcurrentHashMap<>();

@Inject
ResourceManager _resourceMgr;
@Inject
Expand All @@ -203,6 +218,14 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
protected final ConfigKey<Boolean> CheckTxnBeforeSending = new ConfigKey<Boolean>("Developer", Boolean.class, "check.txn.before.sending.agent.commands", "false",
"This parameter allows developers to enable a check to see if a transaction wraps commands that are sent to the resource. This is not to be enabled on production systems.", true);

protected final ConfigKey<Boolean> InvestigateDisconnectedHosts = new ConfigKey<>("Advanced", Boolean.class, "investigate.disconnected.hosts",
"false", "Determines whether to investigate VMs on disconnected hosts", false);
protected final ConfigKey<Integer> InvestigateDisconnectedHostsInterval = new ConfigKey<>("Advanced", Integer.class, "investigate.disconnected.hosts.interval",
"300", "The time (in seconds) between VM investigation on disconnected hosts.", false);
protected final ConfigKey<Integer> InvestigateDisconnectedHostsPoolSize = new ConfigKey<Integer>("Advanced", Integer.class, "investigate.disconnected.hosts.pool.size", "10",
"Default pool size to investigate disconnected hosts", false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Default pool size to investigate disconnected hosts", false);
"The thread pool size to investigate disconnected hosts", false);



@Override
public boolean configure(final String name, final Map<String, Object> params) throws ConfigurationException {

Expand Down Expand Up @@ -238,6 +261,9 @@ public boolean configure(final String name, final Map<String, Object> params) th

_monitorExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AgentMonitor"));

_scanHostsExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("HostsScanner"));
_investigatorExecutor = new ScheduledThreadPoolExecutor(InvestigateDisconnectedHostsPoolSize.value(), new NamedThreadFactory("DisconnectHostsInvestigator"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_investigatorExecutor = new ScheduledThreadPoolExecutor(InvestigateDisconnectedHostsPoolSize.value(), new NamedThreadFactory("DisconnectHostsInvestigator"));
_investigatorExecutor = new ScheduledThreadPoolExecutor(InvestigateDisconnectedHostsPoolSize.value(), new NamedThreadFactory("DisconnectedHostsInvestigator"));


return true;
}

Expand Down Expand Up @@ -614,6 +640,10 @@ public boolean start() {

_monitorExecutor.scheduleWithFixedDelay(new MonitorTask(), mgmtServiceConf.getPingInterval(), mgmtServiceConf.getPingInterval(), TimeUnit.SECONDS);

if (InvestigateDisconnectedHosts.value()) {
_scanHostsExecutor.scheduleAtFixedRate(new ScanDisconnectedHostsTask(), 60, 60, TimeUnit.SECONDS);
}

return true;
}

Expand Down Expand Up @@ -773,6 +803,8 @@ public boolean stop() {

_connectExecutor.shutdownNow();
_monitorExecutor.shutdownNow();
_investigatorExecutor.shutdownNow();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_scanHostsExecutor.shutdownNow() missing

return true;
}

Expand Down Expand Up @@ -1725,7 +1757,8 @@ public String getConfigComponentName() {
@Override
public ConfigKey<?>[] getConfigKeys() {
return new ConfigKey<?>[] { CheckTxnBeforeSending, Workers, Port, Wait, AlertWait, DirectAgentLoadSize, DirectAgentPoolSize,
DirectAgentThreadCap };
DirectAgentThreadCap,
InvestigateDisconnectedHosts, InvestigateDisconnectedHostsInterval, InvestigateDisconnectedHostsPoolSize };
}

protected class SetHostParamsListener implements Listener {
Expand Down Expand Up @@ -1758,8 +1791,11 @@ public void processConnect(final Host host, final StartupCommand cmd, final bool
if (cmd instanceof StartupRoutingCommand) {
if (((StartupRoutingCommand)cmd).getHypervisorType() == HypervisorType.KVM || ((StartupRoutingCommand)cmd).getHypervisorType() == HypervisorType.LXC) {
Map<String, String> params = new HashMap<String, String>();
params.put("router.aggregation.command.each.timeout", _configDao.getValue("router.aggregation.command.each.timeout"));

Arrays.asList(KVM_HEARTBEAT_UPDATE_MAX_RETRIES,
KVM_HEARTBEAT_UPDATE_RETRY_SLEEP,
KVM_HEARTBEAT_UPDATE_TIMEOUT,
KVM_HEARTBEAT_FAILURE_ACTION)
.forEach(c -> params.put(c, _configDao.getValue(c)));
try {
SetHostParamsCommand cmds = new SetHostParamsCommand(params);
Commands c = new Commands(cmds);
Expand Down Expand Up @@ -1838,4 +1874,91 @@ public void propagateChangeToAgents(Map<String, String> params) {
sendCommandToAgents(hostsPerZone, params);
}
}

protected class ScanDisconnectedHostsTask extends ManagedContextRunnable {

@Override
protected void runInContext() {
try {
ManagementServerHostVO msHost = _msHostDao.findOneInUpState(new Filter(ManagementServerHostVO.class, "id", true, 0L, 1L));
if (msHost == null || (msHost.getMsid() != _nodeId)) {
s_logger.debug("Skipping disconnected hosts scan task");
for (Long hostId : _investigateTasksMap.keySet()) {
cancelInvestigationTask(hostId);
}
return;
}
for (HostVO host : _hostDao.listByType(Host.Type.Routing)) {
if (host.getStatus() == Status.Disconnected) {
scheduleInvestigationTask(host.getId());
}
}
} catch (final Exception e) {
s_logger.error("Exception caught while scanning disconnected hosts : ", e);
}
}
}

protected class InvestigationTask extends ManagedContextRunnable {
Long _hostId;

InvestigationTask(final Long hostId) {
_hostId = hostId;
}

@Override
protected void runInContext() {
try {
final long hostId = _hostId;
s_logger.info("Investigating host " + hostId + " to determine its actual state");
HostVO host = _hostDao.findById(hostId);
if (host == null) {
s_logger.info("Cancelling investigation on host " + hostId + " which might has been removed");
cancelInvestigationTask(hostId);
return;
}
if (host.getStatus() != Status.Disconnected) {
s_logger.info("Cancelling investigation on host " + hostId + " in status " + host.getStatus());
cancelInvestigationTask(hostId);
return;
}
Status determinedState = _haMgr.investigate(hostId);
s_logger.info("Investigators determine the status of host " + hostId + " is " + determinedState);
if (determinedState == Status.Down) {
agentStatusTransitTo(host, Status.Event.HostDown, _nodeId);
s_logger.info("Scheduling VMs restart on host " + hostId + " which is Down");
_haMgr.scheduleRestartForVmsOnHost(host, true);
s_logger.info("Cancelling investigation on host " + hostId + " which is Down");
cancelInvestigationTask(hostId);
}
} catch (final Exception e) {
s_logger.error("Exception caught while handling investigation task: ", e);
}
}
}

private void scheduleInvestigationTask(final Long hostId) {
ScheduledFuture future = _investigateTasksMap.get(hostId);
if (future != null) {
s_logger.info("There is already a task to investigate host " + hostId);
} else {
ScheduledFuture scheduledFuture = _investigatorExecutor.scheduleWithFixedDelay(new InvestigationTask(hostId), InvestigateDisconnectedHostsInterval.value(),
InvestigateDisconnectedHostsInterval.value(), TimeUnit.SECONDS);
_investigateTasksMap.put(hostId, scheduledFuture);
s_logger.info("Scheduled a task to investigate host " + hostId);
}
}

private void cancelInvestigationTask(final Long hostId) {
ScheduledFuture future = _investigateTasksMap.get(hostId);
if (future != null) {
try {
future.cancel(false);
s_logger.info("Cancelled a task to investigate host " + hostId);
_investigateTasksMap.remove(hostId);
} catch (Exception e) {
s_logger.error("Exception caught while cancelling investigation task: ", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ public Status isAgentAlive(Host agent) {
break;
}
}
if (!hasNfs) {
List<StoragePoolVO> zonePools = _storagePoolDao.findZoneWideStoragePoolsByHypervisor(agent.getDataCenterId(), agent.getHypervisorType());
for (StoragePoolVO pool : zonePools) {
if (pool.getPoolType() == StoragePoolType.NetworkFilesystem) {
hasNfs = true;
break;
}
}
}
if (!hasNfs) {
s_logger.warn(
"Agent investigation was requested on host " + agent + ", but host does not support investigation because it has no NFS storage. Skipping investigation.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,37 @@ public class KVMHABase {
private static final Logger s_logger = Logger.getLogger(KVMHABase.class);
private long _timeout = 60000; /* 1 minutes */
protected static String s_heartBeatPath;
protected long _heartBeatUpdateTimeout = 60000;
protected long _heartBeatUpdateFreq = 60000;
protected long _heartBeatUpdateMaxTries = 5;
protected long _heartBeatUpdateRetrySleep = 10000;
protected static long s_heartBeatUpdateTimeout = 60000;
protected static long s_heartBeatUpdateFreq = 60000;
protected static long s_heartBeatUpdateMaxRetries = 5;
protected static long s_heartBeatUpdateRetrySleep = 10000;
protected static HeartBeatAction s_heartBeatFailureAction = HeartBeatAction.HARDRESET;

public static enum PoolType {
PrimaryStorage, SecondaryStorage
}

public static enum HeartBeatAction {
HARDRESET("hardreset", "-c"),
DESTROYVMS("destroyvms", "-d"),
NOACTION("noaction", "-n"),
STOPAGENT("stopagent", "-s");

String _action;
String _flag;
HeartBeatAction(String action, String flag) {
_action = action;
_flag = flag;
}
@Override
public String toString() {
return _action;
}
public String getFlag() {
return _flag;
}
}

public static class NfsStoragePool {
String _poolUUID;
String _poolIp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public Boolean checkingHeartBeat() {
cmd.add("-m", pool._mountDestPath);
cmd.add("-h", _hostIP);
cmd.add("-r");
cmd.add("-t", String.valueOf(_heartBeatUpdateFreq / 1000));
cmd.add("-t", String.valueOf(s_heartBeatUpdateFreq / 1000));
OutputInterpreter.OneLineParser parser = new OutputInterpreter.OneLineParser();
String result = cmd.execute(parser);
s_logger.debug("KVMHAChecker pool: " + pool._poolIp);
Expand Down
Loading