Real-Time Paris Metro Crowdedness Forecast: A Proof of Concept with Azure and C#

Azure C#
Photo by Philip Rasch

Navigating through the Paris Metros offers convenience, yet it can become challenging during peak hours due to crowded conditions and limited seating availability. Thanks to advanced predictive tools, commuters can now strategize their trips more efficiently, sidestepping the hassles of packed trains.

In this piece, we delve into crafting a prototype for forecasting crowd levels in Paris Métros using Azure, Microsoft’s cloud solution. Detailed guidance and coding illustrations in C# and Terraform will be presented to initiate your journey.

Connect and Create resources in Azure

The provided code demonstrates the authentication process with Azure through the AuthenticationContext class within the Microsoft.IdentityModel.Clients.ActiveDirectory namespace. Subsequently, it initializes a DataFactoryManagementClient instance from the Microsoft.Azure.Management.DataFactory namespace to engage with the Azure Data Factory.

Furthermore, the code illustrates the creation of a data factory and the establishment of input and output datasets utilizing the AzureBlobDataset class. It also outlines the formulation of a pipeline incorporating a copy operation from the input dataset to the designated output dataset.

This code can serve as a foundational blueprint for designing your personalized data workflows within Azure Data Factory. Ensure adaptation to your distinct requirements by substituting variables like tenantId, clientId, clientSecret, subscriptionId, factoryName, resourceGroup, and storageConnectionString.

using Microsoft.Azure.Management.DataFactory;
using Microsoft.Azure.Management.DataFactory.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory;
using Microsoft.Rest;

namespace DataFactoryExample
{
    class Program
    {
        static void Main()
        {
            // Authenticate and create a DataFactoryManagementClient
            var client = AuthenticateDataFactoryClient("tenantId", "clientId", "clientSecret", "subscriptionId");
            // Define resource group and factory name
            var resourceGroup = "dev-rgp";
            var factoryName = "dev-adf";
            // Create Data Factory and datasets
            CreateDataFactory(client, resourceGroup, factoryName);
            CreateDatasets(client, resourceGroup, factoryName);
            // Create pipeline
            CreatePipeline(client, resourceGroup, factoryName);
            // Create a pipeline run
            CreatePipelineRun(client, resourceGroup, factoryName);
        }
        // Authenticate and create DataFactoryManagementClient
        static DataFactoryManagementClient AuthenticateDataFactoryClient(string tenantId, string clientId, string clientSecret, string subscriptionId)
        {
            var context = new AuthenticationContext("https://login.windows.net/" + tenantId);
            ClientCredential clientCredential = new ClientCredential(clientId, clientSecret);
            AuthenticationResult authenticationResult = context.AcquireTokenAsync("https://management.azure.com/", clientCredential).Result;
            ServiceClientCredentials cred = new TokenCredentials(authenticationResult.AccessToken);
            var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
            return client;
        }
        // Create Data Factory
        static void CreateDataFactory(DataFactoryManagementClient client, string resourceGroup, string factoryName)
        {
            var dataFactory = new Factory { Location = "francecentral" };
            client.Factories.CreateOrUpdate(resourceGroup, factoryName, dataFactory);
            Console.WriteLine("Data Factory created successfully.");
        }
        // Create Input and Output datasets
        static void CreateDatasets(DataFactoryManagementClient client, string resourceGroup, string factoryName)
        {
            CreateDataset(client, resourceGroup, factoryName, "input", "input.txt", "InputDataset");
            CreateDataset(client, resourceGroup, factoryName, "output", "output.txt", "OutputDataset");
            Console.WriteLine("Datasets created successfully.");
        }
        // Helper method to create dataset
        static void CreateDataset(DataFactoryManagementClient client, string resourceGroup, string factoryName, string blobPath, string blobFilename, string datasetName)
        {
            var dataset = new DatasetResource(
                new AzureBlobDataset
                {
                    LinkedServiceName = new LinkedServiceReference { ReferenceName = "AzureBlobStorageLinkedService" },
                    FolderPath = blobPath,
                    FileName = blobFilename,
                    Format = new TextFormat()
                }
            );
            client.Datasets.CreateOrUpdate(resourceGroup, factoryName, datasetName, dataset);
        }
        // Create pipeline with a copy activity
        static void CreatePipeline(DataFactoryManagementClient client, string resourceGroup, string factoryName)
        {
            var pipelineName = "Prediction_Pipeline";
            var pipeline = new PipelineResource
            {
                Activities = new List<Activity>
                {
                    new CopyActivity
                    {
                        Name = "CopyFromBlobToBlob",
                        Inputs = new List<DatasetReference> { new DatasetReference() { ReferenceName = "InputDataset" } },
                        Outputs = new List<DatasetReference> { new DatasetReference() { ReferenceName = "OutputDataset" } },
                        Source = new BlobSource(),
                        Sink = new BlobSink()
                    }
                }
            };
            client.Pipelines.CreateOrUpdate(resourceGroup, factoryName, pipelineName, pipeline);
            Console.WriteLine("Pipeline created successfully.");
        }
        // Create pipeline run
        static void CreatePipelineRun(DataFactoryManagementClient client, string resourceGroup, string factoryName)
        {
            CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(resourceGroup, factoryName, "Prediction_Pipeline").Result.Body;
            Console.WriteLine("Pipeline run created successfully.");
        }
    }
}

Train a Machine Learning Algorithm

Subsequently, it’s essential to train a machine learning algorithm using the accumulated data for predictive purposes. Employ Azure Machine Learning, a cloud-centric platform dedicated to constructing, refining, and deploying machine learning models, for this task.

Below is an illustration demonstrating the training of a machine learning algorithm utilizing C# in conjunction with the Azure Machine Learning.

using Microsoft.ML;
using Microsoft.ML.Data;

public class MetroTravelData
{
    [LoadColumn(0)]
    public float NumberOfPassengers;
    
    [LoadColumn(1)]
    public float TimeOfDay;
    
    [LoadColumn(2)]
    public float DayOfWeek;
    
    [LoadColumn(3)]
    public bool IsHoliday;
    
    [LoadColumn(4)]
    public string LevelOfCrowdedness;
}
// Define the input for model prediction
public class MetroTravelInput
{
    public float NumberOfPassengers;
    public float TimeOfDay;
    public float DayOfWeek;
    public bool IsHoliday;
}
// Define the output after model prediction
public class MetroTravelOutput
{
    [ColumnName("PredictedLabel")]
    public string PredictedCrowdednessLevel;
}
class Program
{
    static void Main()
    {
        // Create MLContext
        var mlContext = new MLContext();
        // Load the dataset
        var dataset = mlContext.Data.LoadFromTextFile<MetroTravelData>("data.csv", separatorChar: ',');
        // Split the dataset into training and testing sets
        var trainTestSplit = mlContext.Data.TrainTestSplit(dataset);
        var trainingData = trainTestSplit.TrainSet;
        var testingData = trainTestSplit.TestSet;
        // Define the data processing pipeline
        var dataProcessingPipeline = mlContext.Transforms.Conversion.MapValueToKey("LevelOfCrowdedness")
            .Append(mlContext.Transforms.Concatenate("Features", new[] { "NumberOfPassengers", "TimeOfDay", "DayOfWeek", "IsHoliday" }))
            .Append(mlContext.Transforms.NormalizeMinMax("Features"));
        // Define the training algorithm
        var classificationTrainer = mlContext.MulticlassClassification.Trainers.SdcaNonCalibrated();
        
        var trainingPipeline = dataProcessingPipeline
            .Append(classificationTrainer)
            .Append(mlContext.Transforms.Conversion.MapKeyToValue("PredictedLabel"));
        // Train the model
        ITransformer trainedModel = trainingPipeline.Fit(trainingData);
        // Evaluate the model
        var modelPredictions = trainedModel.Transform(testingData);
        var modelMetrics = mlContext.MulticlassClassification.Evaluate(modelPredictions);
    }
}

This script demonstrates the utilization of the MLContext class within the Microsoft.ML namespace to establish a machine learning environment. It employs the LoadFromTextFile function to ingest data from a CSV file into an IDataView structure.

Subsequently, the code illustrates the division of the dataset into training and validation subsets via the TrainTestSplit function. Furthermore, it constructs a data transformation pipeline utilizing various transformations available in the Microsoft.ML.Transforms namespace.

Following this, the code specifies a training procedure by utilizing the SdcaNonCalibrated trainer from the Microsoft.ML.Trainers namespace. This trainer is integrated into the data transformation pipeline, and the pipeline is applied to the training dataset for model training.

In conclusion, the code details the process of assessing the trained model’s performance on the validation data, utilizing both the Transform and Evaluate functions from the Microsoft.ML namespace.

This code serves as a foundational guide for initiating your machine learning endeavors utilizing C# and the Azure Machine Learning SDK.

Deploy the model to Azure

After you’ve successfully trained your machine learning model, the next step involves deploying it to facilitate real-time prediction capabilities. To achieve this, you have the option to utilize either Azure Kubernetes Service or Azure Container Instances to deploy your model as a containerized application.

Here is a sample demonstration illustrating how you can deploy a machine learning model leveraging C# in conjunction with the Azure Machine Learning SDK.

using Microsoft.Azure.Management.ContainerInstance;
using Microsoft.Azure.Management.ContainerInstance.Models;
using Microsoft.Azure.Management.ResourceManager.Fluent;

class Program
{
    static void Main()
    {
        // Authenticate and initialize the Container Instance Management Client
        var containerInstanceClient = InitializeContainerInstanceClient("clientId", "clientSecret", "tenantId", "subscriptionId");
        // Define container group details
        var containerGroupName = "container-rgp";
        var resourceGroupName = "dev-rgp";
        var containerGroupDetails = CreateContainerGroup();
        // Create or update the container group
        containerInstanceClient.ContainerGroups.CreateOrUpdate(resourceGroupName, containerGroupName, containerGroupDetails);
    }
    static ContainerInstanceManagementClient InitializeContainerInstanceClient(string clientId, string clientSecret, string tenantId, string subscriptionId)
    {
        var credentials = SdkContext.AzureCredentialsFactory.FromServicePrincipal(clientId, clientSecret, tenantId, AzureEnvironment.AzureGlobalCloud);
        return new ContainerInstanceManagementClient(credentials) { SubscriptionId = subscriptionId };
    }
    static ContainerGroup CreateContainerGroup()
    {
        return new ContainerGroup
        {
            Location = "francecentral",
            Containers = new List<Container>
            {
                new Container
                {
                    Name = "image-ontainer",
                    Image = "image",
                    Resources = new ResourceRequirements
                    {
                        Requests = new ResourceRequests
                        {
                            Cpu = 1,
                            MemoryInGB = 1.5
                        }
                    },
                    Ports = new List<ContainerPort>
                    {
                        new ContainerPort(80)
                    }
                }
            },
            OsType = OperatingSystemTypes.Linux,
            IpAddress = new IpAddress
            {
                Type = "Public",
                Ports = new List<Port>
                {
                    new Port(80)
                }
            }
        };
    }
}

You can employ this code as a foundational guide to deploy your machine learning models using C# in combination with Azure Container Instances. It is imperative to adapt this code according to your specific requirements by supplying personalized values for variables such as clientId, clientSecret, tenantId, subscriptionId, containerGroupName, resourceGroup, and Image.

Establish a streaming pipeline

To facilitate real-time updates for your predictions, it’s essential to develop a streaming pipeline that handles incoming data and modifies predictions as required. For this task, you have the option to utilize either Azure Stream Analytics or Azure Databricks to construct this pipeline.

Below is a sample illustration demonstrating how you can create a streaming pipeline using C# in tandem with Azure Stream Analytics.

using Microsoft.Azure.Management.StreamAnalytics;
using Microsoft.Azure.Management.StreamAnalytics.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory;
using Microsoft.Rest;

class Program
{
    static void Main()
    {
        var tenantID = "";
        var applicationId = "";
        var authenticationKey = "";
        var subscriptionId = "";
        var resourceGroup = "dev-rgp";
        var jobName = "job";
        var  analyticsManagementClient  = AuthenticateAndCreateClient(tenantID, applicationId, authenticationKey, subscriptionId);
        CreateStreamAnalyticsJob(analyticsManagementClient, resourceGroup, jobName);
        DefineInputTransformationOutput(analyticsManagementClient, resourceGroup, jobName);
        StartStreamAnalyticsJob(analyticsManagementClient, resourceGroup, jobName);
    }
    static StreamAnalyticsManagementClient AuthenticateAndCreateClient(string tenantID, string applicationId, string authenticationKey, string subscriptionId)
    {
        var context = new AuthenticationContext($"https://login.windows.net/{tenantID}");
        var clientCredential = new ClientCredential(applicationId, authenticationKey);
        var result = context.AcquireTokenAsync("https://management.azure.com/", clientCredential).Result;
        var tokenCredentials = new TokenCredentials(result.AccessToken);
        return new StreamAnalyticsManagementClient(tokenCredentials) { SubscriptionId = subscriptionId };
    }
    static void CreateStreamAnalyticsJob(StreamAnalyticsManagementClient client, string resourceGroup, string jobName)
    {
        var job = new Job
        {
            Location = "francecentral",
            Sku = new Sku { Name = "Standard" },
            EventsOutOfOrderPolicy = EventsOutOfOrderPolicy.Adjust,
            OutputStartMode = OutputStartMode.JobStartTime,
            EventsOutOfOrderMaxDelayInSeconds = 0,
        };
        client.StreamingJobs.CreateOrUpdate(resourceGroup, jobName, job);
    }
    static void DefineInputTransformationOutput(StreamAnalyticsManagementClient client, string resourceGroup, string jobName)
    {
        // Input Definition
        var inputName = "input";
        var input = new Input
        {
            Properties = new StreamInputProperties
            {
                Serialization = new JsonSerialization { Encoding = Encoding.UTF8 },
                DataSource = new BlobStreamInputDataSource
                {
                    StorageAccounts = new List<StorageAccount>
                    {
                        new StorageAccount { AccountName = "", AccountKey = "" }
                    },
                    Container = "container",
                    PathPattern = "{date}/{time}",
                    DateFormat = "yyyy/MM/dd",
                    TimeFormat = "HH"
                }
            }
        };
        client.Inputs.CreateOrReplace(resourceGroup, jobName, inputName, input);
        // Transformation Definition
        var transformationName = "transformation";
        var transformationQuery = @"
            SELECT
                AVG(crowdedness) AS avgCrowdedness,
                metroId,
                System.Timestamp AS time
            INTO
                output
            FROM
                input TIMESTAMP BY time
            GROUP BY
                metroId,
                TumblingWindow(minute, 1)";
        var transformation = new Transformation { Query = transformationQuery, StreamingUnits = 1 };
        client.Transformations.CreateOrReplace(resourceGroup, jobName, transformationName, transformation);
        // Output Definition
        var outputName = "output";
        var output = new Output
        {
            Properties = new AzureSqlDatabaseOutputDataSource
            {
                Database = "outputAlias",
                User = "outputAlias",
                Password = "outputAlias",
                Table = "outputTableName",
                Server = "outputConnectionString"
            }
        };
        client.Outputs.CreateOrReplace(resourceGroup, jobName, outputName, output);
    }
    static void StartStreamAnalyticsJob(StreamAnalyticsManagementClient client, string resourceGroup, string jobName)
    {
        client.StreamingJobs.Start(resourceGroup, jobName);
    }
}

Incorporate forecasts into an application for end-users

Once your live streaming setup is operational, you can begin to incorporate instantaneous congestion forecasts into a consumer-oriented platform or solution.

You might design a smartphone application leveraging the Azure Maps SDK to exhibit live congestion data on a geographical map. Commuters could utilize this application to determine which transportation routes or stations are congested, allowing them to make well-informed travel arrangements.

Here’s a demonstration illustrating how you might utilize C# in conjunction with the Azure Maps SDK to showcase live congestion data on a map.

using AzureMapsToolkit;
using AzureMapsToolkit.Common;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        var subscriptionKey = "";
        var mapsServices = new AzureMapsServices(subscriptionKey);
        var crowdednessData = GetCrowdednessData();
        var featureCollection = CreateFeatureCollection(crowdednessData);
        var response = await RenderMapImageAsync(mapsServices, featureCollection);
    }
    static List<CrowdednessData> GetCrowdednessData()
    {
        // Implement the logic to retrieve real-time crowdedness data
        return new List<CrowdednessData>();
    }
    static FeatureCollection CreateFeatureCollection(List<CrowdednessData> crowdednessData)
    {
        return new FeatureCollection
        {
            Features = crowdednessData.Select(data => new Feature
            {
                Geometry = new Point
                {
                    Coordinates = new double[] { data.Longitude, data.Latitude }
                },
                Properties = new Dictionary<string, object>
                {
                    { "crowdednessLevel", data.CrowdednessLevel }
                }
            }).ToList()
        };
    }
    static async Task<RenderGetMapImageResult> RenderMapImageAsync(AzureMapsServices mapsServices, FeatureCollection featureCollection)
    {
        return await mapsServices.Render.GetMapImage(new RenderGetMapImageRequest
        {
            Format = "png",
            Layer = "basic",
            Style = "main",
            Zoom = 12,
            Center = "0,0",
            Width = 600,
            Height = 400,
            GeoJson = featureCollection.ToJson()
        });
    }
}
class CrowdednessData
{
    public double Longitude { get; set; }
    public double Latitude { get; set; }
    public int CrowdednessLevel { get; set; }
}

Conclusion

Creating a proof of concept (POC) to forecast real-time metros crowding using Azure, C# entails multiple stages. Initially, you must gather and refine data to train your machine learning algorithm. Subsequently, you’ll deploy this model and establish a continuous pipeline to handle real-time data and refine predictions as necessary. Ultimately, you’ll integrate these insights into an application or platform accessible to metros passengers, offering them real-time crowding updates.

By adhering to this approach, you can construct a sophisticated solution that capitalizes on Azure, C# capabilities to deliver precise and timely train crowding forecasts.