# Reducing communication overhead over distributed data streams by filtering frequent items.

Abstract. In the environment of distributed data stream systems, the available communication bandwidth is a bottleneck resource. To improve the availability of communication bandwidth, communication overhead should be reduced as much as possible under the constraint of the precision of queries. In this paper, a new approach is proposed to transfer data streams in distributed data stream systems. By transferring the estimated occurrence times of frequent items, instead of raw frequent items, it can save a lot of communication overhead. Meanwhile, in order to guarantee the precision of queries, the difference between the estimated and true occurrence times of each frequent item is also sent to the central stream processor. We present the algorithm of processing frequent items over distributed data streams and give the method of supporting aggregate queries over the preprocessed frequent items. Finally, the experimental results prove the efficiency of our method.Categories and Subject Descriptors

C.2.1 [Data Structures]: Distributed data structures; C.2.1 [Network Architecture and Design] Wireless communication, network communcation; B.4 [Input/Output and data communications] B.4.1 Data Communications Devices

General Terms

MPEG-4, Segment-based adaptation, UMA, Video adaptation

Keywords: Communication bandwidth, Distributed data, Network communication

1 Introduction

In many recent applications, data takes the form of continuous data streams, rather than finite stored data sets. Examples include stock ticks in financial applications, log records or click-streams in Web tracking and personalization, data feeds from sensor applications, and so on.

The technique of processing distributed data stream has attracted the researchers in the database community [1,2,3,4]. In distributed data stream systems, the available communication bandwidth becomes a bottleneck resource [1]. Especially in the application of sensor networks, the battery energy, which sensors use to transmit the data, is also a bottleneck resource [5]. So, one research issue in distributed data stream systems is how to reduce the transferred data volume as much as possible under the constraint of the precision of queries, in order to save the communication cost or the electric energy.

At present, there are three kinds of methods to reduce communication cost. The first method is to discard some unimportant data by installing filters at remote data source [1]. The second method suggests pushing some operations to remote data source nodes to run [5]. The third method proposes to compress raw data at remote data source nodes before further transferring [6]. However, the above three methods have drawbacks.

(1) The first method can't support ad-hoc queries in real-time because the remote data source nodes only transmit the data satisfying the queries, e.g. Temperature [greater than or equal to] 10. If a new ad-hoc query need to access all the recent data, the central stream process can't give the answer since the data satisfying Temperature < 10 is absent.

(2) The second method restricts the sharing of querying-results since the operations pushed to the remote data source nodes may be complicated. Moreover, for some operations, such as Join operations, the raw data stream still needs to be transferred to the central stream processor node, so that it causes the redundant data during the transmission.

(3) The third method is hard to guarantee the precision of queries over compressed data. In addition, the delay may be caused due to the process of compressing raw data, which leads to the fact that some important data can't be transmitted to the central stream processor in time and it can't meet the real-time requirement of applications.

In a word, although the previous work can reduce communication cost, they can't extensively support queries in real time with a good precision guarantee. Note that the frequent items in data streams occupy most of the communication cost, so it can enhance the availability of communication bandwidth largely by reducing the transferring volume of frequent items. In our work, we propose to transfer the estimated occurrence times of frequent items, instead of the raw frequent items, in distributed data stream systems for the purpose of saving communication overhead. Meanwhile, in order to guarantee the precision of queries, the difference between the estimated and true occurrence times of each frequent item is also sent to the central stream processor. Thus, our method can not only overcome the drawback of the previous methods and guarantee the precision of queries, but also reduce the communication overhead.

2 Related Work

The technology about distributed data stream systems has attracted people's research interest. [2] describes some methods for processing the distributed data stream in Aurora *. [1] studies the problem of saving the bandwidth in distributed data steam system through installing the filters on the remote data sources. [3] discusses how to support Top-K queries in the environment of distributed data stream systems. [8] introduces an architecture for processing data streams in wireless sensor networks. [5] discusses the problem of handling queries in wireless sensor network. [4] gives some distributed streams algorithm and [9] studies the routing strategy of tuples among distributed operations. However, these work determine the transferring content and form in network based on the requirement of queries, which is different with our method that does not depend on the requirement of queries and mainly decrease the transferring volume of frequent items in data streams so that the total communication cost is reduced.

In order to save communication cost, [6] proposes to transfer data by replacing the raw data sub-series with the median value (named as PMC-MR method) or the mean value (named as PMC-MEAN method). [10] considers to compress the transferred raw data using basic signal which is extracted from data streams. Our method is different from above two methods and focuses on compressing the transferred frequent items in data streams and can highly guarantee the precision of queries.

3 Model and Data Structure

3.1 Model of distributed data stream systems

Figure 1 illustrates the model of distributed data stream system. All kinds of processor nodes consist of the central stream processor node, relay nodes and remote data source nodes. Remote data source nodes produce and collect data, and transmit raw data to the relay nodes. Relay nodes preprocess the received raw data and transmit it to the central stream processor node. The central stream processor node takes charge of user's queries and returns the answers. The controller on the central stream processor node is responsible for sending the registered requirement and precision of queries to the relay nodes.

[FIGURE 1 OMITTED]

3.2 Frequent items

The data stream from the remote data source nodes can be denoted as an unlimited multiset with the form of <r, t>, where r denotes an item and t denotes its timestamp (i.e. time order) which is determined by the time when r enters the data stream system or when the remote data source produces r [15]. Suppose the items in data streams form into a series [r.sub.1], ..., [r.sub.n], and F([r.sub.i]) denotes the occurrence times of the item [r.sub.i] in data streams. Given a real number [lambda], where 0 [less than of equal to] [lambda] [less than of equal to] 1, if F([r.sub.i]) [greater than of equal to] [lambda] x n holds, [r.sub.i] is called as a frequent item in data stream and [lambda] is the support-degree of frequent items.

Definition 1. (Derived-body, Forecast-body, Correct-body) Suppose r is a frequent item, the triple (r, [T.sub.v], Count) is called as the derived-body of r, where [T.sub.v] = [[t.sub.b], [t.sub.e] is the lifetime of r and we denote [absolute value of [T.sub.v]] as [t.sub.e] - [t.sub.b]. Derived-body is divided into two classes, named as forecast-body and correct-body respectively. When the derived-body is a forecast-body (denoted by Prb), Count means the estimated occurrence times of r within [T.sub.v]. When the derived-body is a correct-body (denoted by Crb), Count means the difference between the estimated and true occurrence times of r within [T.sub.v].

Definition 2. (Querying-metadata) A unit doublet (Qid, [T.sub.v]) is called as a Querying-metadata, denoted by MoQ, where Qid identifies the query and [T.sub.v] = [[t.sub.b], [t.sub.e]] is the querying time interval.

Each registered query, especially for motoring queries, can specify a querying time interval, which can be represented by querying-metadata. The controller on the central processor node collects querying-metadata based on the requirement of registered queries and sends it to relay nodes. Meanwhile, in order to control the progress of generating derived-bodies of frequent items in data streams, each relay node maintains a querying-metadata series as Mo[Q.sub.1], ..., Mo[Q.sub.n], where Mo[Q.sub.i].[T.sub.v].[t.sub.e] [less than or equal to] Mo[Q.sub.j].[T.sub.v].[t.sub.e] and i < j hold.

3.3 FDI structure

[FIGURE 2 OMITTED]

Each relay node maintains a data structure about frequent data items (denoted by FDI) described as in Figure 2, which preserves the corresponding information of the frequent items who will occur within the future period of time [T.sub.p]. The information includes the raw frequent item r, the estimated occurrence times r.[f.sub.e] of r within [T.sub.p], and the true occurrence times r.[f.sub.r] of r within [T.sub.p], where [T.sub.p] is defined as the lifetime of FDI and r.[f.sub.e] is the approximate estimated value of r.[f.sub.r]. We use TopK to denote the set of all frequent items in FDI.

Information of frequent items in FDI structure is maintained as follows. Initially, for each frequent items r in FDI, the value of r.[f.sub.r] is set to zero. When the relay node receives a raw item s from remote data source nodes, it will examine whether s[member of] TopK holds. If it holds, the value of s.[f.sub.r] in FDI is increased by one.

3.4 Research issue

Based on the model of distributed data stream systems in Figure 1, the research issue in this paper is how relay nodes generate derived-bodies of frequent items in data streams, so as to filter raw frequent items and reduce the communication cost under the constraint of precision of queries. Meanwhile, the method for processing derived-bodies on the central stream processor node also needs to be studied.

4 Processing Data Streams on Relay Nodes

4.1 Determining frequent items

In distributed data stream systems, reducing the volume of transferred frequent items can decrease the communication overhead greatly. Therefore, frequent items in data streams need to be determined firstly.

Although there are some research results about how to make statistic analysis over data streams [11,14], we adopt the statistic analysis method (named as hCount algorithm) and output method (named as eFreq algorithm) proposed in [11] to obtain frequent items over data streams. The advantage of hCount and eFreq algorithms is simple and easy to be implemented. Meanwhile, frequent items can be output only by once scan over data streams and the raw data needn't to be preserved. Suppose the domain of items in data streams is D. Given h different hash-functions [H.sub.i], where each [H.sub.i] maps the domain D into the integer interval [0 .. m-1] independently and uniformly. The data structure used in hCount algorithm is a two-dimensional array S with the size of h x m. Initially, set the value of each cell in S to be zero. During the time interval [T.sub.h] = [[t.sub.b], [t.sub.e]], as a new item k arrives, for any integer i, the value of S[[H.sub.i](k)][i] is increased by one, where 0 [less than or equal to] i [less than or equal to] h - 1 holds. When the system clock reaches the time [t.sub.e], eFreq algorithm can approximately compute the occurrence times of any item k within the time interval [[t.sub.b], [t.sub.e]] as F(k) = min{ S[[H.sub.i](k)][l] | 0 [less than or equal to] i [less than or equal to] h - 1} based on the values in array S. Given the support-degree [lambda] of frequent items, eFreq algorithm can output the set of frequent items over data streams as TopK = {k | F(k) [greater than or equal to] [lambda] x N [conjunction] k [member of] D) with the support-degree [lambda], where [T.sub.h] is the valid-statistical-interval of hCount algorithm and N is total occurrence times of all items within [T.sub.h].

4.2 Processing frequent items

4.2.1 Generating forecast-bodies

Forecast-bodies should be generated and transmitted at the beginning of their lifetime. Otherwise, it will cause the delay for querying frequent items on the central stream processor. Concretely, forecast-bodies of frequent items should be generated at the end time of the first querying requirement in querying-metadata series (mentioned in Section 3.2). Following is the algorithm of generating forecast-bodies of frequent items.

Algorithm 1: Generate-Prb

(1). Empty FDI structure, obtain frequent items using eFreq algorithm and preserve them in FDI structure;

(2). For any frequent item k in FDI structure, set k.[f.sub.r] = F(k) and k.[f.sub.e] = 0, where F(k) is the occurrence times of k computed by eFreq algorithm;

(3). Determine the lifetime [T.sub.p] of FDI structure, where [T.sub.p].[t.sub.b] = Now, [T.sub.p].[t.sub.e] = Mo[Q.sub.1].[T.sub.v].[t.sub.e] (i.e. the end time of the first querying requirement in querying-metadata series) and Now denotes the current system clock;

(4). For any frequent item k in TopK, generate and transmit the forecast-body Pr[b.sub.k] = (k, [T.sub.v], Count) of k, where Pr[b.sub.k].[T.sub.v] = [T.sub.p], Pd[b.sub.k].Count = [absolute value of Pr[b.sub.k].[T.sub.v]] x F(k) / [absolute value of [T.sub.h]], [T.sub.h] is the valid-statistical-interval of hCount algorithm and F(k) is the occurrence times of k computed by eFreq algorithm within [T.sub.h];

4.2.2 Processing data during lifetime of FDI

After generating the forecast-bodies of frequent times, i.e. during the lifetime of FDI, relay nodes continue to deal with the arriving raw data and reduce the volume of data transferred to the central stream processor as much as possible. Meanwhile, relay nodes also prepare to generate the next round forecast-bodies of frequent items. During this process, there are some variables needed to be computed respectively, including the true occurrence times of total frequent items (denoted by [C.sub.f]), the occurrence times of all items (denoted by [C.sub.r]), the summation of frequent items (denoted by [S.sub.f]) and the summation of all items (denoted by [S.sub.r]). The algorithm of dealing with data during the lifetime [T.sub.p] of FDI on relay nodes can be described as follows.

Algorithm 2: Processing-within-Tp

(1) Clear the two-dimensional array S in hCount algorithm, set the valid-statistical-interval [T.sub.h] of hCount algorithm to be [T.sub.p];

(2) Initialize the variables, let [C.sub.f] = 0, [C.sub.r] = 0, [S.sub.f] = 0 and [S.sub.r] = 0;

(3) WHILE (there is a new item r arriving and Now [less then or equal to] [T.sub.p].[t.sub.e] holds )

(4) Invoke hCount algorithm with the input r;

(5) IF (r [member of] TopK) r.[f.sub.r]++, [C.sub.f]++, [S.sub.f]+ = r ; /* r is a frequent item and will not be sent to the central stream processor. */

(6) ELSE transmit r to the central stream processor; /* since r is a non-frequent item. */

(7) [C.sub.r]++, [S.sub.r] = r;

4.2.3 Generating correct-bodies

As the characters of data streams fluctuate continuously, the deviation will exist between the estimated occurrence times in forecast-bodies and the true occurrence time of frequent items. To guarantee the precision of queries on the central stream processor node, relay nodes should also transmit the correct-bodies of frequent items to correct the content of transferred forecast-bodies. Correct-bodies should be generated and transmitted before the next round forecast-bodies are generated, i.e. the end time of the lifetime of FDI. Note that, the time of generating the correct-body of frequent item k is the deadline of forecast-body of k.

Since communication cost is also needed when transferring correct-bodies, on the condition of guaranteeing the precision of queries, we can save the communication overhead by only transmitting the correct-body of frequent item k that does not satisfy the constraint (1), instead of all correct-bodies, where S is called as the correct-factor.

[absolute value of k.[f.sub.e] - k.[f.sub.r]] [less than or equal to] [delta] x k.[f.sub.r] (1)

Following is the algorithm of generating correct-bodies.

Algorithm 3: Generate-Crb

(1) FOR (each frequent item k in TopK)

(2) IF ([absolute value of k.[f.sup.e] - k.[f.sub.r]] > [delta] x k.[f.sub.r])

(3) Generate the correct-body [Crb.sub.k] = (k, [T.sub.v], Count) of k, where [Crb.sub.k].[T.sub.v] = [T.sub.p], [Crb.sub.k].Count = k.[f.sub.r] - k.[f.sub.e] and [T.sub.p] is the lifetime of FDI;

(4) Transmit the correct-body [Crb.sub.k] of k to the central stream processor;

(5) Delete Mo[Q.sub.1], from the querying-metadata series;

Suppose the frequent items in TopK satisfying the constraint (1) can be formed into the subset HS. The correct-bodies of frequent items in HS needn't to be sent to the central stream processor. Thus, error will be caused when querying the frequent items in HS. In Generate-Crb algorithm, the value of correct-factor [delta] is determined by the precision requirement of queries on the central stream processor. In this paper, we mainly focus on the aggregate queries over distributed data streams. We will study the relationship between precision requirement of aggregate queries and the correct-factor.

Theorem 1. Suppose the maximum tolerated relative error of COUNT aggregation is [n.sub.c], then [delta] [less than or equal to] [[eta].sub.c] x [C.sub.r] / [C.sub.f], holds.

Proof: The input of COUNT aggregation on the central stream processor may come from many relay nodes. The relative error of COUNT aggregation caused by all relay nodes is given in formula (2),

[psi] = [absolute value of [DELTA]C] / [C.sub.r] [less than or equal to] [summation] [absolute value of [DELTA] [C.sub.i]]/ [summation] [C.sub.ri] (2)

where [DETLA]C is the total absolute error, [C.sub.r] is the true answer, [DETLA][C.sub.i] denotes the absolute error caused by the ith relay node, and [C.sub.ri] denotes the contribution of the ith relay node to [C.sub.r]. For any relay node i, under the effect of correct-factor [[delta].sub.i], formula (3) computes the absolute error of COUNT aggregation based on the data transferred to the central stream processor,

[absolute value of [DELTA][C.sub.i]] = [absolute value of [[summation].sub.k [member of] HS]] Cr[b.sub.k].Count] [less than or equal to] [[summation].sub.k [member of] HS] [absolute value of Cr[b.sub.k].Count] (3)

In light of the constraint (1) and the third step in Generate-Crb algorithm, [absolute value of Cr[b.sub.k]] / k. Count I/ k.fr [less than or equal to] [[delta].sub.i], holds. Therefore, we can obtain

[[summation].sub.k [member of] HS]] [absolute value of Cr[b.sub.k].Count] [less than or equal to] [[delta].sub.i] x [[summation].sub.k [member of] HS] k.[f.sub.r] [less than or equal to] [[delta].sub.i] x [C.sub.fi] (4)

where [C.sub.fi] is the total true occurrence times of frequent items obtained on the ith relay node. Replacing the variables in formula (2) with formula (3) and formula (4), we can obtain

[psi] [less than or equal to] [summation] ([[delta].sub.l]x[C.sub.fi])/ [summation][C.sub.ri] = [summation]([[delta].sub.i]x[C.sub.fi]x[C.sub.ri]/ [C.sub.ri])/[summation][C.sub.ri]=max{[[delta].sub.i]x[C.sub.fi]/[C.sub.ri]} x[summation][C.sub.ri]/[summation][C.sub.ri]=max{[[delta].sub.i]x[C.sub.fi]/ [C.sub.ri]}.

Let max{([[delta].sub.i] x [C.sub.fi] / [C.sub.fi]) = [[eta].sub.c], for any relay node, we can know the correct-factor must satisfy [delta] [less than or equal to] [[eta].sub.c] x [C.sub.r]/[C.sub.f] in order to guarantee the precision of COUNT aggregate queries on the central stream processor.

Theorem 2. Suppose the maximum tolerated relative error of SUM aggregation is [[eta].sub.s], then [delta] [less than or equal to] [[eta].sub.S] x [S.sub.f]/[S.sub.r] holds.

Proof: Since it is similar with the proof of Theorem 1, we omit it. Obviously, since all distinct items can be transmitted to the central stream processor node, MAX aggregation and MIN aggregation don't nearly produce any error.

4.3 Algorithm of processing data streams on relay nodes Input: raw data streams, querying-metadata series Mo[Q.sub.1], ..., Mo[Q[Q.sub.n], where Mo[Q.sub.j]. [T.sub.v].[t.sub.e] [less than or equal to] Mo[Q.sub.j]. [T.sub.v].[t.sub.e] and i<j.

Output: derived-bodies of frequent items in data streams.

Algorithm 4: Relay Node-Processing

(1) As data continuously arrives, use hCount algorithm to make statistical analysis;

(2) WHILE (querying-metadata series is not null)

(3) Invoke Generate-Prb algorithm;

(4) Invoke Processing-within-Tp algorithm;

(5) Invoke Generate-Crb algorithm;

5 Processing Data on Central Stream Processor Node

In this paper, we mainly focus on aggregate queries over distributed data streams. In query plans of aggregate queries, the input data (including raw non-frequent times and derived-bodies) flows through several middle operators (such as project, select operators and so on) and finally arrives at aggregate operators (such as SUM, COUNT and so on) that will output the querying results. We will discuss how operators in query plans process derived-bodies in detail.

5.1 Project and select operators

When derived-bodies are taken as the input, project and select operators only process the frequent items contained in derived-bodies. Suppose d=(k, [T.sub.v], Count) is a derived-body, [sigma] is the project or select operator, then the output of [sigma](d) is ([sigma](d), [T.sub.v], Count).

5.2 Join operators

At present, most of join algorithms over data streams are based on sliding windows [13, 16,17,18]. Sliding windows can be defined either in terms of time-unit or tuple-count [7]. Our work focuses on time-unit sliding windows. However, it also adapts to tuple-count sliding windows. The join process over sliding windows is separated into three phases [13]: expiring, inserting, probing and joining. Suppose T is the valid time interval span of sliding windows (such as the recent data within 20 seconds) and Now denotes the current system clock. The structure of a join operator is shown in Figure 3(a), where A and B are two different data streams, [W.sub.A] and [W.sub.B] are the sliding windows over A and B respectively. Figure 3(b) illustrates the data structure of sliding windows, where the column StreamData preserves the data coming from data streams and the column Deadline (denoted by Do preserves the deadline of data in sliding windows.

Expiring: The condition of expiring the data in sliding windows is DI<Now. Although both immediate-strategy and timing-strategy can be used to expire the data [13], all items in sliding windows should be scanned. So, the time complexity is O(n) and n is total number of data in sliding windows.

Inserting: When a new data r arrives, r is inserted into StreamData of sliding windows. If r is a derived-body, set Deadline of r to be r.[T.sub.v].[t.sub.e]+T. Otherwise (i.e. r is a raw data), set Deadline of r to be r.Timestamp+T. The time complexity is O(1).

Probing and joining: When a new data r arrives from A (or 8), data s in B (or A) satisfying the join condition should be probed and joined with r. Then, the join-result is yielded. We will discuss how to process r [??]s in detail.

[FIGURE 3 OMITTED]

(1). Both r and s are the raw data: Suppose r = [k.sub.1], [ts.sub.1]> , s=<[k.sub.2], [ts.sub.2]>, then the output of r [[??].sub.s] is <[k.sub.1] [??][k.sub.2], max T>, where maxT = max{[ts.sub.1], [ts.sub.2]).

(2). Either r or s is a derived-body: Without loss of generality, suppose r=<[k.sub.1], ts> is a raw data and s=([k.sub.2], [T.sub.v], [C.sub.s]) is a derived-body, then the output of r [??] s is ([k.sub.1] [??] [k.sub.2], [T.sub.j], [C.sub.j]). Based on the relationship of [T.sub.v] and ts, the value of [T.sub.j], and [C.sub.j]. can be determined as Table 1.

Delay us cause when correct-bodies of frequent items arrive in the Central stream processor. In order to prevent raw data from expiring too early and having no chance to join with correct-bodies, we should extend the deadline of raw data in sliding windows during the probing and joining process. The last column in Table 1 has presented how to modify the deadline of raw data.

(3) Both r and s are derived-bodies: Without loss of generality, suppose r-([k.sub.1], [T.sub.r], [C.sub.r]), s=([k.sub.2], [T.sub.s], [C.sub.s]) and [T.sub.s].[t.sub.b] > [T.sub.r].[t.sub.b], Then [T.sub.s].[t.sub.b] [less than or equal to] [T.sub.r].[t.sub.e]+T is the condition that r s can produce join results. We can obtain the output of [??]s as ([k.sub.1] [??] [k.sub.2], [T.sub.j], [C.sub.j]), where [T.sub.j].[t.sub.b] = [T.sub.s].[t.sub.b], [T.sub.j].[t.sub.e] = min {[T.sub.r].[t.sub.e] + T, [T.sub.s].[t.sub.e]} and

[C.sub.j] = [[integral].sup.b.sub.a] (Min{x+T,[T.sub.s].[t.sub.e]}-Max{[T.sub.s].[t.sub.b],x-T})x [C.sub.r] x [C.sub.s] /([absolute value of [T.sub.r]] x [absolute value of [T.sub.s]]) x dx

The value of a and b can be determined by two cases. When [T.sub.r].[t.sub.e] [less than or equal to] [T.sub.s].[t.sub.e] holds, a and b is determined as in Table 2, where joining process over derived-bodies is described in Figure 4(a)~4(h). Otherwise (when [T.sub.r].[t.sub.e] [greater than or equal to] [T.sub.s].[t.sub.e] holds), a and b is determined as in Table 3, where joining process over derived-bodies is described in Figure 5(a)~5(d).

Similarly, in order to prevent derived-bodies from expiring too early and having no chance to join with correct-bodies, we should extend the deadline of derived-bodies in sliding windows during the probing and joining process. When [T.sub.r].[t.sub.e] [less than or equal to] [T.sub.s].[t.sub.e] holds, let r.DI=max{[T.sub.r].[t.sub.e]+T, [T.sub.s].[t.sub.e]}. Otherwise (when [T.sub.r].[t.sub.e] [less than or equal to] [T.sub.s].[t.sub.e] holds), let s.DI=max{[T.sub.s].[t.sub.e]+T,[T.sub.r].[T.sub.e]}.

Theorem 3. Suppose r=([k.sub.1], [T.sub.r], [C.sub.r]), s=([k.sub.2],[T.sub.s], [C.sub.s]), and [T.sub.r].[t.sub.b]<[T.sub.s].[t.sub.b] [less than or equal to] [T.sub.r].[t.sub.e]+T, then the output of r [??] s is ([k.sub.1], [??][k.sub.2], [T.sub.j], [C.sub.j]), where [MATHEMATICAL EXPRESSION NOT REPRODUCIBLE IN ASCII]

Proof: Based on [T.sub.r].[t.sub.b]<[T.sub.s].[t.sub.b] [less than or equal to] [T.sub.r].[t.sub.e]+T, we can separate the relative position of r and s in sliding windows into two cases, that is [T.sub.r].[t.sub.e]<[T.sub.s].[t.sub.e] (given in Figure 4) and [T.sub.r].[t.sub.e] [greater than or equal to] [T.sub.s].[t.sub.e](given in Figure 5). In Figure 4 and Figure 5, solid rectangles denote the lifetime of r or s and two skewed dashed lines outline the joining range of r and s under the constraint of valid time interval span T of sliding windows. We consider that the distribution of raw frequent items is uniform within the lifetime of each derived-body. Suppose the distribution density of rand s is p, and ps respectively, then [p.sub.r] = [C.sub.r]/[absolute value of [T.sub.r]] and [p.sub.s] [C.sub.s]/ [absolute value of [T.sub.s]]. Given any valid joining data k with the timestamp x within the lifetime of r. Suppose the timestamp of data in the lifetime of s who can join with k ranges from v to u, then we can compute the number of join results as [N.sub.j]=(u-v)x[p.sub.s]. Examining all cases in Figure 4 and Figure 5, we can obtain u=min{x+T,[T.sub.s].[t.sub.e]} and v=max{[T.sub.s].[t.sub.b], x-T). Thus, the number of join results, produced by all valid data in rjoining with the data in s, is [C.sub.j] = [integral] [N.sub.j]x[p.sub.r]xdx = [integral] (u- v)x[p.sub.s]x[p.sub.r]xdx=[integral](min{x+T, [T.sub.s].[t.sub.e]}-max{[T.sub.s].[t.sub.b], x-T})x [C.sub.r] x [C.sub.s] /[absolute value [T.sub.r]]/[absolute value of [T.sub.s]] x dx.

[FIGURE 4 OMITTED]

[FIGURE 5 OMITTED]

5.3 Aggregate operators

It is straightforward for aggregate operators to process derived-bodies. When handling derived-bodies, aggregate operators directly regard them as many same items. Suppose db=(k, [T.sub.v], C) is a derived-body, aggV denotes the aggregate results before processing db and aggV' denotes the aggregate results after processing db. Following describes how aggregate operator processes db.

(1) agg[V'.sub.COUNT] = agg[V.sub.count]+C.

(2) agg[V'.sub.SUM] = agg[V.sub.sum] + k'C.

(3) agg[V'.sub.AVG = agg[V'.sub.SUM] / agg[V'.sub.COUNT]

(4) agg[V'.sub.MAX] = max{agg[V.sub.MAX], k}.

(5) agg[V'.sub.MIN] = min{ agg[V.sub.MIN], k}.

6 Experimental Results

We simulate the distributed data stream system and implement our algorithms in VC under the environment of P4 2.4GHz CPU, 256MB memory, and 40GB disk. One kind of experimental data comes from a synthetic dataset obeying Zipf data distribution and another is a real network monitoring dataset.

During the experiments, remote data source nodes read data continuously from datasets. Having attached a timestamp (which is an incremental integer starting from 1) to each data, remote nodes send it to relay nodes. Relay nodes run hCount and eFreq algorithm and obtain the frequent items in data streams. After relay nodes transmit derived-bodies and non-frequent items to the central stream processor, they will filter the arriving frequent items. Above data processing functions of all nodes are simulated by thread mechanism in our experiments.

6. 1 Experimental content and data

6.1.1 Experimental content

We mainly examine the reduced communication cost in networks when relay nodes transmit derived-bodies instead of the raw frequent items. Due to neglecting the detailed transferring protocol, for simplicity, the number of transferred data is used instead of their size to measure the data volume transferred in networks, since the size of derived-bodies is always more less than the size of raw items. So, the total communication cost is the sum of the number of raw non-frequent items and the number of derived-bodies. The relative reduced communication cost is computed by (RFI-FB-CB)l NRD, where RFI is the number of raw frequent items, FB is the number of forecast-bodies, CB is the number of correct-bodies, and NRD is the total number of raw data.

Experiments examine many factors that mainly influence the performance (i.e. relative reduced communication cost) of RelayNode-Processing algorithm. Since other compression methods can't well support queries over compressed data, no experimental comparison with them is performed.

6.1.2 Zipf dataset

Suppose D is the domain of the dataset, each distinct data in D can be formed into a series as [d.sub.1], ..., [d.sub.i], ..., [d.sub.[absolute value of D]] based on its occurrence times in the dataset. Let [p.sub.i] be the occurrence times of the ith data in above series. Then, the Zipf distribution function is defined by [p.sub.i] = (1/

c)[(1/i).sup.[alpha]], where [MATHEMATICAL EXPRESSION NOT REPRODUCIBLE IN ASCII] and [alpha] is named as Zipf index.

Specially, setting [alpha]=0 gives a uniform distribution and [alpha]=1 gives the traditional Zipf distribution.

Zipf dataset (denoted by DataSet-I) has the data volume of 100,000 and the domain is [1..10000]. Default value of all parameters is described as follows: The Zipf index is 1, the size of two-dimensional array S used in hCount algorithm is 30x30, the support-degree of frequent items is 0.5%, the maximum available memory usage of FDI structure is 300 entries. We execute 20 queries and the querying timestamp range of the jth query is from 1 to 5000xj, where 1 [less than or equal to] j [less than or equal to] 20.

6.1.3 Network monitoring dataset

In the experiments over network monitoring dataset (denoted by DataSet-II) [12], all queries focus on the column desthost of the dataset and the volume of experimental data is 1,000,000. Default value of all parameters is described as follows: The size of two- dimensional array Sin hCount algorithm is 30x30, the support-degree of frequent items is 0.5%, the maximum available memory usage of FDI structure is 200 entries. We execute 500 queries and the querying timestamp range of the jth query is from 1 to the 2000xj, where 1 [less than or equal to] j [less than or equal to] 500.

6.2 Support-degree and reduced communication cost

[FIGURE 6 OMITTED]

The relationship between support-degree of frequent items and reduced communication cost is shown in Figure 6. As support-degree increases, reduced communication cost decreases. The reason is that more support-degree, less frequent items eFreq algorithm produces, which leads to decrease the number of entries in FDI and increase the size of non-frequent items transferred to the central stream processor node. Although the size of derived-bodies is also decreased, the decrement is clearly less than the increment of the data size of transferred non-frequent items.

6.3 Available memory size of FDI and reduced communication cost

Figure 7 illustrates how the available memory of FDI influences the reduced communication cost, where the available memory size of FDI is computed by the number of entries preserved in FDI. As the available memory size of FDI increases, the number of frequent items preserved in FDI increases. Meanwhile the size of frequent items filtered by relay nodes also increases, which causes to reduce the size of data transferred to the central stream processor, that is, the reduced communication cost increases. When the size of FDI is large enough, the reduced communication cost doesn't fluctuate since the volume of distinct frequent items is less than the size of FDI.

[FIGURE 7 OMITTED]

[FIGURE 8 OMITTED]

Correct-factor has effect on the size of correct-bodies transferred to the central stream processor. Bigger the correct-factor is, larger the tolerated difference produced by forecast-bodies is, which leads to decrease the size of transferred correct-bodies. The relationship between correct-factor and reduced communication cost is illustrated in Figure 8. As the correct-factor increases, the reduced communication cost also increases. However, the increment of reduced communication cost is not large. The reason is that the number of frequent items is large, but the number of correct-bodies is little.

6.5 Size of array S and reduced communication cost

[FIGURE 9 OMITTED]

The performance of hCount algorithm influences that of RelayNode-Processing algorithm greatly. As mentioned in [11], the performance of hCount algorithm depends on the size of two-dimensional array S. Figure 9 shows the relationship between the size of array S and the reduced communication cost, where the X-coordinate is the number of cells in one dimension of S. From Figure 9, as the size of S increases, which enhances the performance of hCount algorithm, the reduced communication cost increases. Note that, the experimental results based on DataSet-II shows some fluctuation, which may be caused by the instability of data distribution in DataSet-II.

6.6 Lifetime span of FDI and reduced communication cost

[FIGURE 10 OMITTED]

Figure 10 illustrates the effect of lifetime span of FDI upon the reduced communication cost. As the lifetime span of FDI increases, the reduced communication cost decreased. The reason is that the bigger the lifetime span of FDI is, the larger the deviation between the expected data distribution and the true data distribution is, which degrades the performance of RelayNode-Processing algorithm. The experimental results in Figure 10 shows that the lifetime span of FDI should not be large, which also proves the importance of improvement work in Section 4.4 indirectly. However, the lifetime span of FDI should not be too small. Otherwise, the statistical data size for hCount algorithm will be too small, which decreases the precision of frequent items produced by eFreq algorithm and degrades the performance of RelayNode-Processing algorithm.

6.7 Zipf index and reduced communication cost

We use the Zipf datasets to examine the relationship between Zipf index and the performance of RelayNode-Processing algorithm. As shown in Figure 11, when Zipf index increases, the reduced communication cost also increases, which shows that RelayNode-Processing algorithm is highly suitable to the applications with the skewed data distribution.

[FIGURE 11 OMITTED]

7. Summary

In this paper, a new method is proposed to transfer data streams in distributed data stream systems. Replacing frequent items with derived-bodies, not only the transferred data size is reduced greatly, but also the precision of queries can be guaranteed. The analytical and experimental results show that our method is easy to be implemented and can be applied to distributed data stream systems widely.

Received 30 Oct. 2004; Reviewed and accepted 30 Jan. 2005

References

[1] Olston, C., Jiang J., & Widom, J. (2003). Adaptive Filters for Continuous Queries over Distributed Data Streams. Proceedings of the 2003 ACM SIGMOD Int'I Conf. on Management of Data. ACM, 2003.563-574.

[2] Cherniack, M., Balakrishnan, H., & Balazinska, M. (2003). Scalable Distributed Stream Processing. Online Proceedings of First Biennial Conference on Innovative Data Systems Research (CIDR), 2003.

[3] Babcock, B., Olston, Chris. (2003). Distributed Top-K Monitoring. Proceedings of the 2003 ACM SIGMOD Int'I Conf. on Management of Data. ACM, 2003. 28-39.

[4] Gibbons, P.B., Tirthapura, S. (2002). Distributed Streams Algorithms for Sliding Windows. Proceedings of the Fourteenth Annual ACM Symposium on Parallel Algorithms and Architectures. 63-72.

[5] Yao, Y., Gehrke, J. (2003). Query Processing for Sensor Networks. Online Proceedings of First Biennial Conference on Innovative Data Systems Research (CIDR), 2003.

[6] Lazaridis, I., Mehrotra, S. (2003). Capturing Sensor-Generated Time Series with Quality Guarantees. Proceedings of the 19th Int'l Conf. on Data Engineering. IEEE Computer Society, 2003. 429-440.

[7] Babcock, B., Babu, S. & Datar, M. (2002). Model and Issues in Data Stream Systems. Proceedings of the Twenty-first ACM SIGACT-SIGMOD-SIGART Symposium on Principles of Database Systems. 1-16.

[8] Madden, S., Franklin, M. J. (2002). Fjording the Stream: An Architecture for Queries over Streaming Sensor Data. Proceedings of the 18th Int'l Conf. on Data Engineering. IEEE Computer Society, 2002. 555-566.

[9] Tian, F., DeWitt, D.J. (2003). Tuple Routing Strategies for Distributed Eddies. Proceedings of 29th International Conference on Very Large Data Bases. 333-344.

[10] Deligiannakis, A., Kotidis, Y., & Roussopoulos, N. (2004). Compressing Historical Information in Sensor Networks. Proceedings of the 2004 ACM SIGMOD Int'l Conf. on Management of Data. ACM, 2004. 527-538.

[11] Jin, C., Qian, W., Sha, C., Yu, J.X., & Zhou, A. (2003). Dynamically Maintaining Frequent Items Over Data Stream. Proceedings of the 2003 ACM CIKM International Conference on Information and Knowledge Management. 287-294.

[12] Floyd, S., Paxson, V. (1995). Wide Area Traffic: The Failure of Poisson Modeling. IEEE/ACM Transactions on Networking, 3(3), 1995. 226-244.

[13] Kang, J., Naughton, J.F., Viglas, S.D. (2003). Evaluating Window Joins over Unbounded Streams. Proceedings of the 19th Int'l Conf. on Data Engineering. IEEE Computer Society, 2003. 341-352.

[14] Arasu, A., Manku, G.S. (2004). Approximate Counts and Quantiles over Sliding Windows. Proceedings of the Twenty-third ACM SIGART-SIGMOD-SIGART Symposium on Principles of Database Systems. 286-296.

[15] Araru, A., Babu, S., Widom, J. (2002). An Abstract Semantics and Concrete Language for Continuous Queries over Streams and Relations. Technical Report, Stanford University Database Group. Nov. 2002. Available at http://dbpubs.stanford.edu/pub/2002-57.

[16] Golab, L., Ozsu, M.T. (2003). Processing Sliding Window Multi-Joins in Continuous Queries over Data Streams. Proceedings of 29th International Conference on Very Large Data Bases. 500-511.

[17] Viglas, S.D., Naughton, J.F., & Burger, J. (2003). Maximizing the Output Rate of Multi-Way Join Queries over Streaming Information Sources. Proceedings of 29th International Conference on Very Large Data Bases. 285-296.

[18] Hammad, M.A., Franklin, M.J., Aref, W.G., & Elmagarmid, A.K. (2003). Scheduling for shared window joins over data streams. Proceedings of 29th International Conference on Very Large Data Bases. 297-308.

(1) This work was supported by the National Natural Science Foundation of China under Grant No.60273082, by the National High-Tech Research and Development Plan of China under Grant No.2002AA444110, by the National Grand Fundamental Research 973 Program of China under Grant No.G1999032704, and by the Natural Science Foundation of Heilongjiang Province of China under Grant No.zjg03-05

Dongdong Zhang (1+), Jianzhong Li (1,2), Weiping Wang (1), Longjiang Guo (1,2), Chunyu Ai (2)

(1) School of Computer Science and Technology, Harbin Institute of Technology, China

(2) School of Computer Science and Technology, Heilongjiang University, China {zddhit, lijzh, wpwang}@hit.edu.cn, liguo_1234@sina.com, chunyu_ai@263.net

Table 1 Join results between raw data and derived-bodies Conditions [T.sub.J] [T.sub.v][t.sub.3. [[T.sub.v] x [t.sub.3]], sup.-]T-ts< min (ts+T, [T.sub.v] [T.sub.v][t.sub.3] x [t.sub.e])] [T.sub.v] x [ts, min (ts+T, [t.sub.3]-<ts< [T.sub.v] x [t.sub.e]] [T.sub.v] x [t.sub.e] [T.sub.v] x [ts, ts] [t.sub.e.sup.-]< [t.sub.s]<[T.sub.v] x [t.sub.e]+T Conditions [C.sub.J] [T.sub.v][t.sub.3. min ([C.sub.5], [absolute sup.-]T-ts< value of [T.sub.J]] x [T.sub.v][t.sub.3] [C.sub.5]/[absolute value of [T.sub.v]]) [T.sub.v] x (min (ts+T, [T.sub.v] x [t.sub.3]-<ts< [t.sub.e])-max([T.sub.v] x [T.sub.v] x [t.sub.3], ts-T) x [C.sub.5]/ [t.sub.e] [absolute value of [T.sub.v]] [T.sub.v] x min([C.sub.5][[T.sub.v] x [t.sub.e.sup.-]< [t.sub.e]-(ts-T)]x[C.sub.5]/ [t.sub.s]<[T.sub.v] [absolute value of [T.sub.v]]) x [t.sub.e]+T Conditions r.Dl [T.sub.v][t.sub.3. max (ts+T, [T.sub.v] sup.-]T-ts< x [t.sub.e]) [T.sub.v][t.sub.3] [T.sub.v] x max (ts+T, [T.sub.v] [t.sub.3]-<ts< x [t.sub.e]) [T.sub.v] x [t.sub.e] [T.sub.v] x ts+T [t.sub.e.sup.-]< [t.sub.s]<[T.sub.v] x [t.sub.e]+T Table 2 a and b when [T.sub.r] x [t.sub.e] [less than or equal to] [T.sub.s] x [t.sub.e] [T.sub.y] x [t.sub.3] + T [less than or equal to] [t.sub.5] x [t.sub.3] Ty x te + T [less than a = [T.sub.5] x [t.sub.3] - or equal to] [T.sub.5] T, b = [T.sub.y] x [t.sub.e] x [t.sub.e] Ty x te + T [greater than a = [T.sub.5] x [t.sub.3] - or equal to] [T.sub.5] T, b = [T.sub.y] x [t.sub.e] x [t.sub.e] [T.sub.y] x [t.sub.3] + T [greater than or equal to] [t.sub.5] x [t.sub.3] Ty x te + T [less than a = [T.sub.5] x [t.sub.3] or equal to] [T.sub.5] b = [T.sub.y] x [t.sub.e] x [t.sub.e] Ty x te + T [greater than a = [T.sub.5] x [t.sub.3] or equal to] [T.sub.5] b = [T.sub.y] x [t.sub.e] x [t.sub.e] Table 3 a and b when [T.sub.r] x [t.sub.e] [greater than or equal to] [T.sub.s] x [t.sub.e] [T.sub.y] x [t.sub.3] + T [less than or equal to] [T.sub.s] x [t.sub.e] [T.sub.y] x [t.sub.e] a = [T.sub.s] x [t.sub.3] - T [less than or - T, b = [T.sub.y] x equal to] [T.sub.s] [t.sub.e] x [t.sub.e] [T.sub.y] x [t.sub.e] a = [T.sub.s] x [t.sub.3] - T [greater than or - T, b = [T.sub.s] x equal to] [T.sub.s] [t.sub.e]+T x [t.sub.e] [T.sub.y] x [t.sub.3] + T [greater than or equal to] [T.sub.s] x [t.sub.e] [T.sub.y] x [t.sub.e] a = [T.sub.s] x [t.sub.3] - T [less than or b = [T.sub.y] x [t.sub.e] equal to] [T.sub.s] x [t.sub.e] [T.sub.y] x [t.sub.e] a = [T.sub.s] x [t.sub.3] - T [greater than or b = [T.sub.s] x [t.sub.e]+T equal to] [T.sub.s] x [t.sub.e]

Printer friendly Cite/link Email Feedback | |

Author: | Zhang, Dongdong; Li, Jianzhong; Wang, Weiping; Guo, Longjiang; Ai, Chunyu |
---|---|

Publication: | Journal of Digital Information Management |

Date: | Jun 1, 2005 |

Words: | 7851 |

Previous Article: | Content adaptation in distributed multimedia systems. |

Next Article: | An original solution to evaluate location-dependent queries in wireless environments. |