As my first attempt with StreamInsight, I decided to use the Performance Counter as my stream of data. Here is a brief overview of implementing the StreamInsight portion
3 important downloads to get started, with StreamInsight:
Download Examples: http://streaminsight.codeplex.com/releases/view/90143
First I created an class to represent the data I wanted to capture in my stream. Not all data types can be used with StreamInsight (ie: enum). Here is a list of supported data types: http://msdn.microsoft.com/en-us/library/ee378905.aspx
The main portion of the program.
Here starting on line 86, I create my StreamInsight server embedded in memory. The server name is the instance name that you give after installing StreamInsight. With StreamInsight 2.1, they added IQStreamable which allows you to query the stream. Lines 104 & 105 is when the stream will be queried for the results.
When creating the stream in line 93, I'm setting up my observation. In this case CollectSamples is my source which is then converted to a temporal stream via ToPointStreamable, which inserts a single event instance with a datetime.
The collect samples, is where my data will be pulled. In this case I'm pulling my data from the performance counter, but returning an observable interval representing this temporal event that occurs every 1 second.
In the end, the final product is a simple winform showing the current CPU Utilization and the last time it ran.
The program can be downloaded from:
3 important downloads to get started, with StreamInsight:
Download
StreamInsight: http://www.microsoft.com/en-us/download/details.aspx?id=26720
Download Reactive: http://www.microsoft.com/en-us/download/confirmation.aspx?id=28568Download Examples: http://streaminsight.codeplex.com/releases/view/90143
First I created an class to represent the data I wanted to capture in my stream. Not all data types can be used with StreamInsight (ie: enum). Here is a list of supported data types: http://msdn.microsoft.com/en-us/library/ee378905.aspx
Code Snippet
- // Input events of CounterSample of supported data types
- public class StreamableCounterSample
- {
- public float CpuUtilization { get; set; }
- public long TimeStamp { get; set; }
- }
The main portion of the program.
Here starting on line 86, I create my StreamInsight server embedded in memory. The server name is the instance name that you give after installing StreamInsight. With StreamInsight 2.1, they added IQStreamable which allows you to query the stream. Lines 104 & 105 is when the stream will be queried for the results.
Code Snippet
- private float CalculatePerformance(BackgroundWorker worker, DoWorkEventArgs e)
- {
- CreateCounters();
- //embedded (in-memory)
- using (Server server = Server.Create("StreamInsight21"))
- {
- Microsoft.ComplexEventProcessing.Application application = server.CreateApplication("app");
- //A query for reading events from a stream
- IQStreamable<StreamableCounterSample> inputStream = null;
- inputStream = CreateStream(application);
- while (true)
- {
- if (worker.CancellationPending)
- {
- e.Cancel = true;
- break;
- }
- else
- {
- perf = inputStream.ToObservable().ToEnumerable().Last().CpuUtilization;
- timestamp = DateTime.FromFileTime(inputStream.ToObservable().ToEnumerable().Last().TimeStamp);
- worker.ReportProgress((int)perf);
- }
- }
- return inputStream.ToObservable().ToEnumerable().Last().CpuUtilization;
- }
- }
When creating the stream in line 93, I'm setting up my observation. In this case CollectSamples is my source which is then converted to a temporal stream via ToPointStreamable, which inserts a single event instance with a datetime.
Code Snippet
- static IQStreamable<StreamableCounterSample> CreateStream(Microsoft.ComplexEventProcessing.Application application)
- {
- // Live data uses IQbservable<>
- return
- application.DefineObservable(() => CollectSamples()).ToPointStreamable(
- r => PointEvent<StreamableCounterSample>.CreateInsert(DateTime.Now, r),
- AdvanceTimeSettings.StrictlyIncreasingStartTime);
- }
The collect samples, is where my data will be pulled. In this case I'm pulling my data from the performance counter, but returning an observable interval representing this temporal event that occurs every 1 second.
Code Snippet
- private static IObservable<StreamableCounterSample> CollectSamples()
- {
- List<StreamableCounterSample> data = new List<StreamableCounterSample>();
- data.Add(new StreamableCounterSample
- {
- CpuUtilization = perfCounter.NextValue(),
- TimeStamp = DateTime.Now.ToFileTime()
- });
- return ToObservableInterval(data, TimeSpan.FromMilliseconds(1000), Scheduler.ThreadPool);
- }
Code Snippet
- private static IObservable
ToObservableInterval (IEnumerable source, TimeSpan period, IScheduler scheduler) - {
- return Observable.Using(
- () => source.GetEnumerator(),
- it => Observable.Generate(
- default(object),
- _ => it.MoveNext(),
- _ => _,
- _ =>
- {
- //Console.WriteLine("Input {0}", it.Current);
- return it.Current;
- },
- _ => period, scheduler));
- }
In the end, the final product is a simple winform showing the current CPU Utilization and the last time it ran.
The program can be downloaded from: