Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions examples/001-dotnet-WebClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,7 @@ private static async Task AskSimpleQuestionStreamingTheAnswer()
Console.WriteLine($"Expected result: formula explanation using the information loaded");

Console.Write("\nAnswer: ");
var answerStream = s_memory.AskStreamingAsync(question, minRelevance: 0.6,
options: new SearchOptions { Stream = true });
var answerStream = s_memory.AskStreamingAsync(question, options: new SearchOptions { Stream = true });

await foreach (var answer in answerStream)
{
Expand Down
3 changes: 1 addition & 2 deletions examples/002-dotnet-Serverless/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,7 @@ private static async Task AskSimpleQuestionStreamingTheAnswer()
Console.WriteLine($"Expected result: formula explanation using the information loaded");

Console.Write("\nAnswer: ");
var answerStream = s_memory.AskStreamingAsync(question, minRelevance: 0.6,
options: new SearchOptions { Stream = true });
var answerStream = s_memory.AskStreamingAsync(question, options: new SearchOptions { Stream = true });

await foreach (var answer in answerStream)
{
Expand Down
60 changes: 30 additions & 30 deletions service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,17 @@ public SimpleQueues(SimpleQueuesConfig config, ILoggerFactory? loggerFactory = n
switch (config.StorageType)
{
case FileSystemTypes.Disk:
this._log.LogTrace("Using {0} storage", nameof(DiskFileSystem));
this._log.LogTrace("Using {StorageType} storage", nameof(DiskFileSystem));
this._fileSystem = new DiskFileSystem(config.Directory, null, loggerFactory);
break;

case FileSystemTypes.Volatile:
this._log.LogTrace("Using {0} storage", nameof(VolatileFileSystem));
this._log.LogTrace("Using {StorageType} storage", nameof(VolatileFileSystem));
this._fileSystem = VolatileFileSystem.GetInstance(config.Directory, null, loggerFactory);
break;

default:
this._log.LogCritical("Unknown storage type {0}", config.StorageType);
this._log.LogCritical("Unknown storage type {StorageType}", config.StorageType);
throw new ArgumentException($"Unknown storage type {config.StorageType}");
}

Expand All @@ -124,15 +124,15 @@ public async Task<IQueue> ConnectToQueueAsync(string queueName, QueueOptions opt

if (!string.IsNullOrEmpty(this._queueName))
{
this._log.LogCritical("The client is already connected to queue {0}", this._queueName);
this._log.LogCritical("The client is already connected to queue {QueueName}", this._queueName);
throw new InvalidOperationException($"The queue is already connected to `{this._queueName}`");
}

this._queueName = queueName;
this._poisonQueueName = $"{queueName}{this._config.PoisonQueueSuffix}";
await this.CreateDirectoriesAsync(cancellationToken).ConfigureAwait(false);

this._log.LogTrace("Client connected to queue {0} and poison queue {1}", this._queueName, this._poisonQueueName);
this._log.LogTrace("Client connected to queue {QueueName} and poison queue {PoisonQueueName}", this._queueName, this._poisonQueueName);

if (options.DequeueEnabled)
{
Expand All @@ -144,11 +144,11 @@ public async Task<IQueue> ConnectToQueueAsync(string queueName, QueueOptions opt
this._dispatchTimer.Elapsed += this.DispatchMessage;
this._dispatchTimer.Start();

this._log.LogTrace("Queue {0}: polling and dispatching timers created", this._queueName);
this._log.LogTrace("Queue {QueueName}: polling and dispatching timers created", this._queueName);
}
else
{
this._log.LogTrace("Queue {0}: dequeue not enabled", this._queueName);
this._log.LogTrace("Queue {QueueName}: dequeue not enabled", this._queueName);
}

return this;
Expand All @@ -172,14 +172,14 @@ await this.StoreMessageAsync(
},
cancellationToken).ConfigureAwait(false);

this._log.LogInformation("Queue {0}: message {1} sent", this._queueName, messageId);
this._log.LogInformation("Queue {QueueName}: message {MessageId} sent", this._queueName, messageId);
}

/// <inheritdoc />
/// <see cref="DistributedPipelineOrchestrator.AddHandlerAsync"/> about the logic handling dequeued messages.
public void OnDequeue(Func<string, Task<ReturnType>> processMessageAction)
{
this._log.LogInformation("Queue {0}: subscribing...", this._queueName);
this._log.LogInformation("Queue {QueueName}: subscribing...", this._queueName);
this.Received += async (sender, args) =>
{
Message message = new();
Expand All @@ -190,34 +190,34 @@ public void OnDequeue(Func<string, Task<ReturnType>> processMessageAction)
ArgumentNullExceptionEx.ThrowIfNull(args.Message, nameof(args.Message), "The message received is NULL");
message = args.Message;

this._log.LogInformation("Queue {0}: message {0} received", this._queueName, message.Id);
this._log.LogInformation("Queue {QueueName}: message {MessageId} received", this._queueName, message.Id);

// Process message with the logic provided by the orchestrator
var returnType = await processMessageAction.Invoke(message.Content).ConfigureAwait(false);
switch (returnType)
{
case ReturnType.Success:
this._log.LogTrace("Message '{0}' successfully processed, deleting message", message.Id);
this._log.LogTrace("Message '{MessageId}' successfully processed, deleting message", message.Id);
await this.DeleteMessageAsync(message.Id, this._cancellation.Token).ConfigureAwait(false);
break;

case ReturnType.TransientError:
message.LastError = "Message handler returned false";
if (message.DequeueCount == this._maxAttempts)
{
this._log.LogError("Message '{0}' processing failed to process, max attempts reached, moving to poison queue. Message content: {1}", message.Id, message.Content);
this._log.LogError("Message '{MessageId}' processing failed to process, max attempts reached, moving to poison queue. Message content: {MessageContent}", message.Id, message.Content);
poison = true;
}
else
{
this._log.LogWarning("Message '{0}' failed to process, putting message back in the queue. Message content: {1}", message.Id, message.Content);
this._log.LogWarning("Message '{MessageId}' failed to process, putting message back in the queue. Message content: {MessageContent}", message.Id, message.Content);
retry = true;
}

break;

case ReturnType.FatalError:
this._log.LogError("Message '{0}' failed to process due to a non-recoverable error, moving to poison queue", message.Id);
this._log.LogError("Message '{MessageId}' failed to process due to a non-recoverable error, moving to poison queue", message.Id);
poison = true;
break;

Expand All @@ -228,7 +228,7 @@ public void OnDequeue(Func<string, Task<ReturnType>> processMessageAction)
catch (KernelMemoryException e) when (e.IsTransient.HasValue && !e.IsTransient.Value)
{
message.LastError = $"{e.GetType().FullName} [{e.InnerException?.GetType().FullName}]: {e.Message}";
this._log.LogError(e, "Message '{0}' failed to process due to a non-recoverable error, moving to poison queue.", message.Id);
this._log.LogError(e, "Message '{MessageId}' failed to process due to a non-recoverable error, moving to poison queue.", message.Id);
poison = true;
}
// Note: must catch all also because using a void event handler
Expand All @@ -237,12 +237,12 @@ public void OnDequeue(Func<string, Task<ReturnType>> processMessageAction)
message.LastError = $"{e.GetType().FullName}: {e.Message}";
if (message.DequeueCount == this._maxAttempts)
{
this._log.LogError(e, "Message '{0}' processing failed with exception, max attempts reached, moving to poison queue. Message content: {1}.", message.Id, message.Content);
this._log.LogError(e, "Message '{MessageId}' processing failed with exception, max attempts reached, moving to poison queue. Message content: {MessageContent}.", message.Id, message.Content);
poison = true;
}
else
{
this._log.LogWarning(e, "Message '{0}' processing failed with exception, putting message back in the queue. Message content: {1}.", message.Id, message.Content);
this._log.LogWarning(e, "Message '{MessageId}' processing failed with exception, putting message back in the queue. Message content: {MessageContent}.", message.Id, message.Content);
retry = true;
}
}
Expand Down Expand Up @@ -280,15 +280,15 @@ private void PopulateQueue(object? sender, ElapsedEventArgs elapsedEventArgs)
var messagesOnStorage = (await this._fileSystem.GetAllFileNamesAsync(this._queueName, "", this._cancellation.Token).ConfigureAwait(false)).ToList();
if (messagesOnStorage.Count == 0) { return; }

this._log.LogTrace("Queue {0}: {1} messages on storage, {2} ready to dispatch, max batch size {3}",
this._log.LogTrace("Queue {QueueName}: {MsgCountOnStorage} messages on storage, {MsgCountReady} ready to dispatch, max batch size {FetchBatchSize}",
this._queueName, messagesOnStorage.Count, this._queue.Count, this._config.FetchBatchSize);

foreach (var fileName in messagesOnStorage)
{
// Limit the number of messages loaded in memory
if (this._queue.Count >= this._config.FetchBatchSize)
{
this._log.LogTrace("Queue {0}: max batch size {1} reached", this._queueName, this._config.FetchBatchSize);
this._log.LogTrace("Queue {QueueName}: max batch size {FetchBatchSize} reached", this._queueName, this._config.FetchBatchSize);
return;
}

Expand All @@ -309,22 +309,22 @@ private void PopulateQueue(object? sender, ElapsedEventArgs elapsedEventArgs)

// Add to list of messages to be processed
this._queue.Enqueue(message);
this._log.LogTrace("Queue {0}: found message {1}", this._queueName, messageId);
this._log.LogTrace("Queue {QueueName}: found message {MessageId}", this._queueName, messageId);
}

if (this._log.IsEnabled(LogLevel.Trace))
{
if (!message.IsTimeToRun())
{
this._log.LogTrace("Queue {0}: skipping message {1} scheduled in the future", this._queueName, messageId);
this._log.LogTrace("Queue {QueueName}: skipping message {MessageId} scheduled in the future", this._queueName, messageId);
}
else if (message.IsLocked())
{
this._log.LogTrace("Queue {0}: skipping message {1} because it is locked", this._queueName, messageId);
this._log.LogTrace("Queue {QueueName}: skipping message {MessageId} because it is locked", this._queueName, messageId);
}
else if (this._queue.Any(x => x.Id == messageId))
{
this._log.LogTrace("Queue {0}: skipping message {1} because it is already loaded", this._queueName, messageId);
this._log.LogTrace("Queue {QueueName}: skipping message {MessageId} because it is already loaded", this._queueName, messageId);
}
}
}
Expand All @@ -336,7 +336,7 @@ private void PopulateQueue(object? sender, ElapsedEventArgs elapsedEventArgs)
}
catch (Exception e)
{
this._log.LogError(e, "Queue {0}: Unexpected error while polling.", this._queueName);
this._log.LogError(e, "Queue {QueueName}: Unexpected error while polling.", this._queueName);
}
finally
{
Expand All @@ -360,7 +360,7 @@ private void DispatchMessage(object? sender, ElapsedEventArgs e)

await s_lock.WaitAsync(this._cancellation.Token).ConfigureAwait(false);

this._log.LogTrace("Dispatching {0} messages", this._queue.Count);
this._log.LogTrace("Dispatching {MessageCount} messages", this._queue.Count);

while (this._queue.TryDequeue(out Message? message))
{
Expand All @@ -369,7 +369,7 @@ private void DispatchMessage(object? sender, ElapsedEventArgs e)
}
catch (Exception ex)
{
this._log.LogError(ex, "Queue {0}: Unexpected error while dispatching", this._queueName);
this._log.LogError(ex, "Queue {QueueName}: Unexpected error while dispatching", this._queueName);
}
finally
{
Expand All @@ -384,25 +384,25 @@ private void DispatchMessage(object? sender, ElapsedEventArgs e)

private async Task<Message> ReadMessageAsync(string id, CancellationToken cancellationToken = default)
{
this._log.LogTrace("Queue {0}: reading message {1}", this._queueName, id);
this._log.LogTrace("Queue {QueueName}: reading message {MessageId}", this._queueName, id);
var serializedMsg = await this._fileSystem.ReadFileAsTextAsync(
volume: this._queueName, relPath: "", fileName: $"{id}{FileExt}", cancellationToken: cancellationToken).ConfigureAwait(false);
return Deserialize(serializedMsg);
}

private async Task StoreMessageAsync(string queueName, Message message, CancellationToken cancellationToken = default)
{
this._log.LogTrace("Queue {0}: storing message {1}", this._queueName, message.Id);
this._log.LogTrace("Queue {QueueName}: storing message {MessageId}", this._queueName, message.Id);
await this._fileSystem.WriteFileAsync(queueName, "", $"{message.Id}{FileExt}", Serialize(message), cancellationToken).ConfigureAwait(false);
}

private async Task DeleteMessageAsync(string id, CancellationToken cancellationToken = default)
{
try
{
this._log.LogTrace("Queue {0}: deleting message {1}", this._queueName, id);
this._log.LogTrace("Queue {QueueName}: deleting message {MessageId}", this._queueName, id);
var fileName = $"{id}{FileExt}";
this._log.LogTrace("Deleting file from storage {0}", fileName);
this._log.LogTrace("Deleting file from storage {FileName}", fileName);
await this._fileSystem.DeleteFileAsync(this._queueName, "", fileName, cancellationToken).ConfigureAwait(false);
}
catch (DirectoryNotFoundException)
Expand Down
Loading