Generating Dummy Data for Event Hubs or Blob Storage

Generating Dummy Data for Event Hubs or Blob Storage

So I was working on this as part of another project, and I thought I would share. Basically, one of the most annoying aspects of building data pipelines is getting test data to verify the results of that data.

So nothing overly ground breaking, but I thought this might be useful for anyone trying to pipe data into a data pipeline, whether that be blob storage or event hub.

So what I did was build a small generic utility to build text files full of JSON objects and then parse those files putting them onto event hub.

Now for the sake of this instance, I decoupled the code for the event hub, so that I could get more utility, and implemented this as part of a dotnet core console application. Below is the method for generating the files:

static void Main(string[] args)
        {
            var builder = new ConfigurationBuilder()
                .SetBasePath(Directory.GetCurrentDirectory())
                .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);
            var configuration = builder.Build();

            var appSettings = new AppSettings();

            ConfigurationBinder.Bind(configuration.GetSection("AppSettings"), appSettings);

            for (var f = 0; f < appSettings.NumberOfFiles; f++)
            {
                var fileName = $"{appSettings.FilePrefix}-{f}-{ DateTime.Now.ToString("MM-dd-yyyy-hh-mm-ss")}.txt";

                Console.WriteLine("-----------------------------------------------------------------------");
                Console.WriteLine($"Creating file - {fileName}");
                Console.WriteLine("-----------------------------------------------------------------------");
                Console.WriteLine("");

                //Create records for entry
                var list = new List<LogEntryModel>();
                for (var x = 0; x < appSettings.MaxNumberOfRecords; x++)
                {
                    var logEntry = new LogEntryModel();

                    logEntry.LogDateTime = DateTime.Now;
                    logEntry.LogMessage = $"Test { x } - { DateTime.Now.ToString("MM-dd-yyyy-hh-mm-ss")}";
                    logEntry.SequenceNumber = x;

                    list.Add(logEntry);
                    Console.WriteLine($"Creating line entry - { logEntry.LogMessage}");

                    var randomTime = RandomNumber(1, appSettings.MaxWaitBetweenEntries);

                    Console.WriteLine($"Thread sleep for { randomTime }");
                    Thread.Sleep(randomTime);
                    Console.WriteLine($"Sleep over - Processing file");
                }

                var filePath = $@"C:\temp\{fileName}";
                //Create text file"
                using (StreamWriter file = File.CreateText(filePath))
                {
                    JsonSerializer serializer = new JsonSerializer();
                    serializer.Serialize(file, list);
                    Console.WriteLine("Pushing Json to file");
                    Console.WriteLine("");
                }

                //Push to blob storage
                BlobServiceClient blobServiceClient = new BlobServiceClient(appSettings.BlobConnectionString);

                //Create a unique name for the container
                string containerName = "logs";

                // Create the container and return a container client object
                var containerClient = blobServiceClient.GetBlobContainerClient(containerName);

                BlobClient blobClient = containerClient.GetBlobClient(fileName);

                Console.WriteLine("Pushing File to Blob Storage");
                Console.WriteLine("");
                using FileStream uploadFile = File.OpenRead(filePath);
                var uploadTask = blobClient.UploadAsync(uploadFile, true);

                uploadTask.Wait();

                uploadFile.Close();

                Console.WriteLine("File Uploaded to Blob storage");
                Console.WriteLine("");

                var randomFileTime = RandomNumber(1, appSettings.MaxWaitBetweenFiles);
                Console.WriteLine($"Thread going to sleep for - { randomFileTime}");
                Thread.Sleep(randomFileTime);
                Console.WriteLine("Thread sleep down, moving onto next file");
                Console.WriteLine("");

                Console.WriteLine($"Started Deleting file {filePath}");
                File.Delete(filePath);
                Console.WriteLine($"Finished Deleting file {filePath}");
            }

            Console.WriteLine("All Files Processed and uploaded.");

            Console.ReadLine();
        }

In addition to creating staggered entries, it additionally outputs in an easy readable format to the console screen. Below is the method I use to generate the random numbers:

static int RandomNumber(int min, int max)
        {
            return _random.Next(min, max);
        }

Overall nothing to special, but it at least creates an easy method of generating the json objects required for pumping through a data pipeline.

Below is all I leverage for a data model for this but this could easily be swapped for any data model you like with some random elements:

public class LogEntryModel
    {
        public DateTime LogDateTime { get; set; }
        public string LogMessage { get; set; }
        public int SequenceNumber { get; set; }
    }

Now on the back end, I needed to take these blob files and parse them. And did so by doing the following:

using (var sr = new StreamReader(logFile, Encoding.UTF8))
            {
                var logs = new List<LogEntryModel>();

                var str = sr.ReadToEnd();

                logs = JsonConvert.DeserializeObject<List<LogEntryModel>>(str);

                await using (var producerClient = new EventHubProducerClient(connectionString, hubName))

                {
                    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();

                    foreach (var logEntry in logs)
                    {
                        var txt = JsonConvert.SerializeObject(logEntry);
                        eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes(txt)));
                    }

                    await producerClient.SendAsync(eventBatch);
                    log.LogInformation($"Log of {name} with {logs.Count} rows processed.");
                }
            }

Anyway, I hope you find this helpful to get data pushed into your pipeline.

Leave a Reply

Your email address will not be published. Required fields are marked *