About Me

My photo
Northglenn, Colorado, United States
I'm primarily a BI Developer on the Microsoft stack. I do sometimes touch upon other Microsoft stacks ( web development, application development, and sql server development).

Tuesday, August 14, 2012

First StreamInsight attempt using Performance Counter

As my first attempt with StreamInsight, I decided to use the Performance Counter as my stream of data. Here is a brief overview of implementing the StreamInsight portion

3 important downloads to get started, with StreamInsight:

Download Reactive: http://www.microsoft.com/en-us/download/confirmation.aspx?id=28568
Download Examples: http://streaminsight.codeplex.com/releases/view/90143


First I created an class to represent the data I wanted to capture in my stream. Not all data types can be used with StreamInsight (ie: enum). Here is a list of supported data types: http://msdn.microsoft.com/en-us/library/ee378905.aspx

Code Snippet
  1. // Input events of CounterSample of supported data types
  2. public class StreamableCounterSample
  3. {
  4.     public float CpuUtilization { get; set; }
  5.     public long TimeStamp { get; set; }
  6. }


The main portion of the program.
Here starting on line 86, I create my StreamInsight server embedded in memory. The server name is the instance name that you give after installing StreamInsight. With StreamInsight 2.1, they added IQStreamable which allows you to query the stream. Lines 104 & 105 is when the stream will be queried for the results.
Code Snippet
  1. private float CalculatePerformance(BackgroundWorker worker, DoWorkEventArgs e)
  2. {
  3.     CreateCounters();
  4.  
  5.     //embedded (in-memory)
  6.     using (Server server = Server.Create("StreamInsight21"))
  7.     {
  8.         Microsoft.ComplexEventProcessing.Application application = server.CreateApplication("app");
  9.  
  10.         //A query for reading events from a stream
  11.         IQStreamable<StreamableCounterSample> inputStream = null;
  12.  
  13.         inputStream = CreateStream(application);
  14.  
  15.         while (true)
  16.         {
  17.             if (worker.CancellationPending)
  18.             {
  19.                 e.Cancel = true;
  20.                 break;
  21.             }
  22.             else
  23.             {
  24.                 perf = inputStream.ToObservable().ToEnumerable().Last().CpuUtilization;
  25.                 timestamp = DateTime.FromFileTime(inputStream.ToObservable().ToEnumerable().Last().TimeStamp);
  26.                 worker.ReportProgress((int)perf);
  27.             }
  28.         }
  29.  
  30.         return inputStream.ToObservable().ToEnumerable().Last().CpuUtilization;
  31.  
  32.     }
  33. }


When creating the stream in line 93, I'm setting up my observation. In this case CollectSamples is my source which is then converted to a temporal stream via ToPointStreamable, which inserts a single event instance with a datetime.

Code Snippet
  1. static IQStreamable<StreamableCounterSample> CreateStream(Microsoft.ComplexEventProcessing.Application application)
  2. {
  3.     // Live data uses IQbservable<>
  4.     return
  5.         application.DefineObservable(() => CollectSamples()).ToPointStreamable(
  6.         r => PointEvent<StreamableCounterSample>.CreateInsert(DateTime.Now, r),
  7.         AdvanceTimeSettings.StrictlyIncreasingStartTime);
  8. }


The collect samples, is where my data will be pulled. In this case I'm pulling my data from the performance counter, but returning an observable interval representing this temporal event that occurs every 1 second.
Code Snippet
  1. private static IObservable<StreamableCounterSample> CollectSamples()
  2. {
  3.     List<StreamableCounterSample> data = new List<StreamableCounterSample>();
  4.     
  5.     data.Add(new StreamableCounterSample
  6.     {
  7.         CpuUtilization = perfCounter.NextValue(),
  8.         TimeStamp = DateTime.Now.ToFileTime()
  9.     });
  10.  
  11.     return ToObservableInterval(data, TimeSpan.FromMilliseconds(1000), Scheduler.ThreadPool);
  12. }


Code Snippet
  1. private static IObservable ToObservableInterval(IEnumerable source, TimeSpan period, IScheduler scheduler)
  2. {
  3.     return Observable.Using(
  4.         () => source.GetEnumerator(),
  5.         it => Observable.Generate(
  6.             default(object),
  7.             _ => it.MoveNext(),
  8.             _ => _,
  9.             _ =>
  10.             {
  11.                 //Console.WriteLine("Input {0}", it.Current);
  12.                 return it.Current;
  13.             },
  14.             _ => period, scheduler));
  15. }


In the end, the final product is a simple winform showing the current CPU Utilization and the last time it ran.


The program can be downloaded from: