Slidingwindows Python Apache Beam Duplicate The Data
Solution 1:
All elements that belong to the window are emitted. If an element belongs to multiple windows it will be emitted in each window.
Accumulation mode only matters if you plan to handle late data/multiple trigger firings. In this case discarding mode gives you only new elements in the window when the trigger fires again, i.e. emits only the elements that arrived in the same window since previous trigger firing, the elements that were already emitted are not emitted again and are discarded. In accumulating mode the whole window will be emitted for every trigger firing, it will include old elements that were already emitted last time and new elements that have arrived since then.
If I understand your example you have sliding windows, they have a length of 30 seconds, and they start every 15 seconds. So they overlap for 15 seconds:
time: ----t+00---t+15---t+30----t+45----t+60------>
: : : : :
w1: |=============| : :
w2: |==============| :
w3: |===============|
...
So any element in your case will belong to at least two windows (except for first and last windows).
E.g. in your example, if your message was sent between 17:07:15 and 17:07:30 it will appear in both windows.
Fixed windows don't overlap so element can belong only to one window:
time: ----t+00---t+15---t+30----t+45----t+60------>
: : :
w1: |=============| :
w2: |===============|
w3: |====...
...
More about windows here: https://beam.apache.org/documentation/programming-guide/#windowing
Solution 2:
I have exactly the same issue, however in java. I have a window with 10 seconds duration and a step of 3 seconds. When an event is emitted from the mqtt topic, that I subscribe to, it looks like the ParDo function that I have runs and emits the first and only event to all of the three "constructed" windows.
X is the event that i send at a random timestamp: 2020-09-15T21:17:57.292Z
time: ----t+00---t+15---t+30----t+45----t+60------>
: : : : :
w1: |X============| : :
w2: |X=============| :
w3: |X==============|
...
Even the same timestamp is assigned to them!! I must really doing something completely wrong.
I use Scala 2.12 and BEAM 2.23 with a Direct Runner.
[Hint]: I use states in the processElement function! Where the state is being hold per key + window. Maybe there is a bug there? I will try to test it without states.
UPDATE: Removed the state fields and the single event is assigned to one window.
Post a Comment for "Slidingwindows Python Apache Beam Duplicate The Data"