« Kinesis » : différence entre les versions
(11 versions intermédiaires par le même utilisateur non affichées) | |||
Ligne 30 : | Ligne 30 : | ||
== Producers == | == Producers == | ||
Producers put records into Kinesis Data Streams. | Producers put records into Kinesis Data Streams. | ||
* EventBridge | |||
* CloudFront, collect views and analyze real-time metrics of CDN | |||
* CloudWatch | |||
<kode lang='cs' collapsed> | |||
var kinesisDataStreamName = "input-stream"; | |||
var region = RegionEndpoint.USEast1; | |||
var kinesisConfiguration = new AmazonKinesisConfig | |||
{ | |||
Profile = new Profile("..."), | |||
RegionEndpoint = region | |||
}; | |||
var kinesisClient = new AmazonKinesisClient(kinesisConfiguration); | |||
for (var i = 1; i < duration; i++) | |||
{ | |||
var message = new Message(i, DateTime.Now); | |||
var data = JsonConvert.SerializeObject(message); | |||
using var memoryStream = new MemoryStream(Encoding.UTF8.GetBytes(data)); | |||
var request = new PutRecordRequest | |||
{ | |||
StreamName = kinesisDataStreamName, | |||
Data = memoryStream, | |||
PartitionKey = "partition1" | |||
}; | |||
var response = kinesisClient.PutRecordAsync(request).Result; | |||
Console.WriteLine($"Shard ID: {response.ShardId}, Sequence Number: {response.SequenceNumber}, HTTPStatusCode: {response.HttpStatusCode}"); | |||
await Task.Delay(1000); // every 1 second | |||
} | |||
class Message(int id, DateTime date) | |||
{ | |||
public int Id { get; set; } = id; | |||
public DateTime Date { get; set; } = date; | |||
} | |||
</kode> | |||
== Consumers == | == Consumers == | ||
Consumers get records from Kinesis Data Streams and process them. | Consumers get records from Kinesis Data Streams and process them. | ||
* Data Firehose | * Data Firehose | ||
* Lambda, read data by batch | |||
* Glue, ETL | |||
* SNS | |||
= Firehose = | = Amazon Data Firehose = | ||
* ingest data from different sources | * ingest data from different sources | ||
* transform source records using AWS lambda functions | * transform source records using AWS lambda functions | ||
Ligne 45 : | Ligne 87 : | ||
== Destination == | == Destination == | ||
* S3 | * S3, [https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html Custom Prefixes for Amazon S3 Objects], [https://aws.amazon.com/blogs/big-data/amazon-data-firehose-custom-prefixes-for-amazon-s3-objects/ Amazon Data Firehose custom prefixes for Amazon S3 objects] | ||
* MongoDB | * MongoDB | ||
* HTTP End Point | |||
== Data transformation == | |||
Allows to modify the data while uploading to the destionation. | |||
<kode lang='cs' collapsed> | |||
// nuget packages: Amazon.Lambda.Serialization.SystemTextJson, Amazon.Lambda.Core, Amazon.Lambda.KinesisFirehoseEvents | |||
using System.Text; | |||
using System.Text.Json; | |||
using Amazon.Lambda.Core; | |||
using Amazon.Lambda.KinesisFirehoseEvents; | |||
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. | |||
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] | |||
namespace FirehoseETL; | |||
// handler: FirehoseETL::FirehoseETL.Function::FunctionHandler | |||
public class Function | |||
{ | |||
public KinesisFirehoseResponse FunctionHandler(KinesisFirehoseEvent kinesisEvent, ILambdaContext context) | |||
{ | |||
var response = new KinesisFirehoseResponse { Records = [] }; | |||
foreach (var record in kinesisEvent.Records) | |||
{ | |||
var inputPayload = Encoding.UTF8.GetString(Convert.FromBase64String(record.Base64EncodedData)); | |||
var message = JsonSerializer.Deserialize<Message>(inputPayload) | |||
?? throw new Exception($"Unable to deserialize input payload: {inputPayload}"); | |||
message.Description = $"Message with id {message.Id} was processed at {DateTime.Now}"; | |||
var outputPayload = JsonSerializer.Serialize(message); | |||
var base64EncodedOutputPayload = Convert.ToBase64String(Encoding.UTF8.GetBytes(outputPayload)); | |||
var firehoseRecord = new KinesisFirehoseResponse.FirehoseRecord | |||
{ | |||
RecordId = record.RecordId, | |||
Result = KinesisFirehoseResponse.TRANSFORMED_STATE_OK, | |||
Base64EncodedData = base64EncodedOutputPayload | |||
}; | |||
response.Records.Add(firehoseRecord); | |||
} | |||
return response; | |||
} | |||
class Message(int id, DateTime date, string description) | |||
{ | |||
public int Id { get; set; } = id; | |||
public DateTime Date { get; set; } = date; | |||
public string Description { get; set; } = description; | |||
} | |||
} | |||
</kode> | |||
= Data Analytics = | |||
* Studio Notebook | |||
* Glue catalog |
Dernière version du 24 mai 2024 à 13:32
Description
Allows to gather data from different sources using data stream or fire hoses, and get that data to a destination in the expected format.
Allows to collect, process and analyze data in near real-time at scale.
Kinesis data streams are used in places where an unbounded stream of data needs to worked on in real time.
And Kinesis Firehose delivery streams are used when data needs to be delivered to a storage destination, such as S3.
Common use cases
- pattern detection
- click stream analysis (who is clicking on what and when)
- log processing for machine learning
- anomyly detection in IoT devices
Benefits
- fully managed service (focus on data and don't worry about the underlying system)
- can handle large amount of data
- can consume process and buffer data in real-time
- allows production of real-time metrics and reporting
Data Stream
Move data from sources to a destination while having analytics, monitoring alerts, and connections to other services.
Common use cases
- log and data intake and procesing
- real-time metrics, reporting, and data analytics
Benefits
- ensures data durability and elasticity
Producers
Producers put records into Kinesis Data Streams.
- EventBridge
- CloudFront, collect views and analyze real-time metrics of CDN
- CloudWatch
var kinesisDataStreamName = "input-stream"; var region = RegionEndpoint.USEast1; var kinesisConfiguration = new AmazonKinesisConfig { Profile = new Profile("..."), RegionEndpoint = region }; var kinesisClient = new AmazonKinesisClient(kinesisConfiguration); for (var i = 1; i < duration; i++) { var message = new Message(i, DateTime.Now); var data = JsonConvert.SerializeObject(message); using var memoryStream = new MemoryStream(Encoding.UTF8.GetBytes(data)); var request = new PutRecordRequest { StreamName = kinesisDataStreamName, Data = memoryStream, PartitionKey = "partition1" }; var response = kinesisClient.PutRecordAsync(request).Result; Console.WriteLine($"Shard ID: {response.ShardId}, Sequence Number: {response.SequenceNumber}, HTTPStatusCode: {response.HttpStatusCode}"); await Task.Delay(1000); // every 1 second } class Message(int id, DateTime date) { public int Id { get; set; } = id; public DateTime Date { get; set; } = date; } |
Consumers
Consumers get records from Kinesis Data Streams and process them.
- Data Firehose
- Lambda, read data by batch
- Glue, ETL
- SNS
Amazon Data Firehose
- ingest data from different sources
- transform source records using AWS lambda functions
- deliver data to a specified destination
Similar to Data Stream but acts as an ETL.
Source
- Kinesis Data Streams
Destination
- S3, Custom Prefixes for Amazon S3 Objects, Amazon Data Firehose custom prefixes for Amazon S3 objects
- MongoDB
- HTTP End Point
Data transformation
Allows to modify the data while uploading to the destionation.
// nuget packages: Amazon.Lambda.Serialization.SystemTextJson, Amazon.Lambda.Core, Amazon.Lambda.KinesisFirehoseEvents using System.Text; using System.Text.Json; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisFirehoseEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace FirehoseETL; // handler: FirehoseETL::FirehoseETL.Function::FunctionHandler public class Function { public KinesisFirehoseResponse FunctionHandler(KinesisFirehoseEvent kinesisEvent, ILambdaContext context) { var response = new KinesisFirehoseResponse { Records = [] }; foreach (var record in kinesisEvent.Records) { var inputPayload = Encoding.UTF8.GetString(Convert.FromBase64String(record.Base64EncodedData)); var message = JsonSerializer.Deserialize<Message>(inputPayload) ?? throw new Exception($"Unable to deserialize input payload: {inputPayload}"); message.Description = $"Message with id {message.Id} was processed at {DateTime.Now}"; var outputPayload = JsonSerializer.Serialize(message); var base64EncodedOutputPayload = Convert.ToBase64String(Encoding.UTF8.GetBytes(outputPayload)); var firehoseRecord = new KinesisFirehoseResponse.FirehoseRecord { RecordId = record.RecordId, Result = KinesisFirehoseResponse.TRANSFORMED_STATE_OK, Base64EncodedData = base64EncodedOutputPayload }; response.Records.Add(firehoseRecord); } return response; } class Message(int id, DateTime date, string description) { public int Id { get; set; } = id; public DateTime Date { get; set; } = date; public string Description { get; set; } = description; } } |
Data Analytics
- Studio Notebook
- Glue catalog