

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Mengubah data menggunakan operator di Managed Service untuk Apache Flink dengan API DataStream
<a name="how-operators"></a>

*Untuk mengubah data masuk dalam Layanan Terkelola untuk Apache Flink, Anda menggunakan operator Apache Flink.* Operator Apache Flink mengubah satu atau beberapa aliran data menjadi aliran data baru. Aliran data baru berisi data yang dimodifikasi dari aliran data asli. Apache Flink menyediakan lebih dari 25 operator pemrosesan aliran yang dibangun sebelumnya. Untuk informasi selengkapnya, lihat [Operator](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) di Dokumentasi Apache Flink.

**Topics**
+ [

## Gunakan operator transformasi
](#how-operators-transform)
+ [

## Gunakan operator agregasi
](#how-operators-agg)

## Gunakan operator transformasi
<a name="how-operators-transform"></a>

Berikut adalah contoh transformasi teks sederhana pada salah satu bidang aliran data JSON. 

Kode ini membuat aliran data yang diubah. Aliran data baru memiliki data yang sama dengan aliran asli, dengan string "` Company`" yang ditambahkan ke isi bidang `TICKER`.

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## Gunakan operator agregasi
<a name="how-operators-agg"></a>

Berikut adalah contoh operator agregasi. Kode membuat aliran data agregat. Operator membuat jendela tumbling 5 detik dan menampilkan jumlah dari nilai `PRICE` untuk catatan di jendela dengan nilai `TICKER` yang sama.

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

Untuk contoh kode lainnya, lihat[Contoh untuk membuat dan bekerja dengan Managed Service untuk aplikasi Apache Flink](examples-collapsibles.md). 